diff --git a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md index 8ece9020..58e85edf 100644 --- a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md +++ b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md @@ -71,6 +71,9 @@ 6. Adjust error handling to surface concurrency/locking issues with retries and user-friendly messages. ``` +**Stage Notes (in progress)** +- Refactoring work is underway to move REST endpoints onto `AssemblyCache` while we align on assembly GUID usage and domain permissions. Pending operator testing before tasks can be completed. + ## 3. Implement Dev Mode authorization and UX toggles [ ] Gate privileged writes behind Admin role + Dev Mode toggle. [ ] Store Dev Mode state server-side (per-user session or short-lived token) to prevent unauthorized access. diff --git a/Data/Engine/assembly_management/bootstrap.py b/Data/Engine/assembly_management/bootstrap.py index 1f415c96..a9f51160 100644 --- a/Data/Engine/assembly_management/bootstrap.py +++ b/Data/Engine/assembly_management/bootstrap.py @@ -9,6 +9,7 @@ from __future__ import annotations +import copy import logging import threading from pathlib import Path @@ -17,6 +18,7 @@ from typing import Dict, List, Mapping, Optional from .databases import AssemblyDatabaseManager from .models import AssemblyDomain, AssemblyRecord, CachedAssembly from .payloads import PayloadManager +from .sync import sync_official_domain class AssemblyCache: @@ -90,12 +92,29 @@ class AssemblyCache: for record in records: self._payload_manager.ensure_runtime_copy(record.payload) entry = CachedAssembly(domain=domain, record=record, is_dirty=False, last_persisted=record.updated_at) - self._store[record.assembly_id] = entry - self._domain_index[domain][record.assembly_id] = entry + self._store[record.assembly_guid] = entry + self._domain_index[domain][record.assembly_guid] = entry + + def get_entry(self, assembly_guid: str) -> Optional[CachedAssembly]: + """Return a defensive copy of the cached assembly.""" - def get(self, assembly_id: str) -> Optional[AssemblyRecord]: with self._lock: - entry = self._store.get(assembly_id) + entry = self._store.get(assembly_guid) + if entry is None: + return None + return self._clone_entry(entry) + + def list_entries(self, *, domain: Optional[AssemblyDomain] = None) -> List[CachedAssembly]: + """Return defensive copies of cached assemblies, optionally filtered by domain.""" + + with self._lock: + if domain is None: + return [self._clone_entry(entry) for entry in self._store.values()] + return [self._clone_entry(entry) for entry in self._domain_index[domain].values()] + + def get(self, assembly_guid: str) -> Optional[AssemblyRecord]: + with self._lock: + entry = self._store.get(assembly_guid) if not entry: return None return entry.record @@ -108,37 +127,37 @@ class AssemblyCache: def stage_upsert(self, domain: AssemblyDomain, record: AssemblyRecord) -> None: with self._lock: - entry = self._store.get(record.assembly_id) + entry = self._store.get(record.assembly_guid) if entry is None: entry = CachedAssembly(domain=domain, record=record, is_dirty=True) entry.mark_dirty() - self._store[record.assembly_id] = entry - self._domain_index[domain][record.assembly_id] = entry + self._store[record.assembly_guid] = entry + self._domain_index[domain][record.assembly_guid] = entry else: entry.domain = domain entry.record = record entry.mark_dirty() - self._pending_deletes.pop(record.assembly_id, None) - self._dirty[record.assembly_id] = entry + self._pending_deletes.pop(record.assembly_guid, None) + self._dirty[record.assembly_guid] = entry self._flush_event.set() - def stage_delete(self, assembly_id: str) -> None: + def stage_delete(self, assembly_guid: str) -> None: with self._lock: - entry = self._store.get(assembly_id) + entry = self._store.get(assembly_guid) if not entry: return entry.is_dirty = True - self._dirty.pop(assembly_id, None) - self._pending_deletes[assembly_id] = entry + self._dirty.pop(assembly_guid, None) + self._pending_deletes[assembly_guid] = entry self._flush_event.set() def describe(self) -> List[Dict[str, str]]: with self._lock: snapshot = [] - for assembly_id, entry in self._store.items(): + for assembly_guid, entry in self._store.items(): snapshot.append( { - "assembly_id": assembly_id, + "assembly_guid": assembly_guid, "domain": entry.domain.value, "is_dirty": "true" if entry.is_dirty else "false", "dirty_since": entry.dirty_since.isoformat() if entry.dirty_since else "", @@ -158,6 +177,23 @@ class AssemblyCache: if flush: self._flush_dirty_entries() + def read_payload_bytes(self, assembly_guid: str) -> bytes: + """Return the payload bytes for the specified assembly.""" + + with self._lock: + entry = self._store.get(assembly_guid) + if not entry: + raise KeyError(f"Assembly '{assembly_guid}' not found in cache") + return self._payload_manager.read_payload_bytes(entry.record.payload) + + @property + def payload_manager(self) -> PayloadManager: + return self._payload_manager + + @property + def database_manager(self) -> AssemblyDatabaseManager: + return self._db_manager + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @@ -186,17 +222,17 @@ class AssemblyCache: self._db_manager.delete_record(entry.domain, entry) self._payload_manager.delete_payload(entry.record.payload) with self._lock: - self._store.pop(entry.record.assembly_id, None) - self._domain_index[entry.domain].pop(entry.record.assembly_id, None) + self._store.pop(entry.record.assembly_guid, None) + self._domain_index[entry.domain].pop(entry.record.assembly_guid, None) except Exception as exc: self._logger.error( "Failed to delete assembly %s in domain %s: %s", - entry.record.assembly_id, + entry.record.assembly_guid, entry.domain.value, exc, ) with self._lock: - self._pending_deletes[entry.record.assembly_id] = entry + self._pending_deletes[entry.record.assembly_guid] = entry return for entry in dirty_items: @@ -207,14 +243,24 @@ class AssemblyCache: except Exception as exc: self._logger.error( "Failed to flush assembly %s in domain %s: %s", - entry.record.assembly_id, + entry.record.assembly_guid, entry.domain.value, exc, ) with self._lock: - self._dirty[entry.record.assembly_id] = entry + self._dirty[entry.record.assembly_guid] = entry break + def _clone_entry(self, entry: CachedAssembly) -> CachedAssembly: + record_copy = copy.deepcopy(entry.record) + return CachedAssembly( + domain=entry.domain, + record=record_copy, + is_dirty=entry.is_dirty, + last_persisted=entry.last_persisted, + dirty_since=entry.dirty_since, + ) + def initialise_assembly_runtime( *, @@ -232,6 +278,10 @@ def initialise_assembly_runtime( db_manager.initialise() payload_manager = PayloadManager(staging_root=payload_staging, runtime_root=payload_runtime, logger=logger) + try: + sync_official_domain(db_manager, payload_manager, staging_root, logger=logger) + except Exception: # pragma: no cover - best effort during bootstrap + (logger or logging.getLogger(__name__)).exception("Official assembly sync failed during startup.") flush_interval = _resolve_flush_interval(config) return AssemblyCache.initialise( @@ -275,4 +325,3 @@ def _discover_staging_root() -> Path: if data_dir.is_dir(): return data_dir.resolve() raise RuntimeError("Could not locate staging assemblies directory (expected Data/Engine/Assemblies).") - diff --git a/Data/Engine/assembly_management/databases.py b/Data/Engine/assembly_management/databases.py index 1a0d1d77..8e367de7 100644 --- a/Data/Engine/assembly_management/databases.py +++ b/Data/Engine/assembly_management/databases.py @@ -22,34 +22,27 @@ from .models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadDescr _SCHEMA_STATEMENTS: Iterable[str] = ( - """ - CREATE TABLE IF NOT EXISTS payloads ( - payload_guid TEXT PRIMARY KEY, - payload_type TEXT NOT NULL, - file_name TEXT NOT NULL, - file_extension TEXT NOT NULL, - size_bytes INTEGER NOT NULL DEFAULT 0, - checksum TEXT, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL - ) - """, """ CREATE TABLE IF NOT EXISTS assemblies ( - assembly_id TEXT PRIMARY KEY, + assembly_guid TEXT PRIMARY KEY, display_name TEXT NOT NULL, summary TEXT, category TEXT, assembly_kind TEXT NOT NULL, assembly_type TEXT, version INTEGER NOT NULL DEFAULT 1, - payload_guid TEXT NOT NULL, metadata_json TEXT, tags_json TEXT, checksum TEXT, + payload_type TEXT NOT NULL, + payload_file_name TEXT NOT NULL, + payload_file_extension TEXT NOT NULL, + payload_size_bytes INTEGER NOT NULL DEFAULT 0, + payload_checksum TEXT, + payload_created_at TEXT NOT NULL, + payload_updated_at TEXT NOT NULL, created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - FOREIGN KEY(payload_guid) REFERENCES payloads(payload_guid) ON DELETE CASCADE + updated_at TEXT NOT NULL ) """, "CREATE INDEX IF NOT EXISTS idx_assemblies_kind ON assemblies(assembly_kind)", @@ -87,6 +80,14 @@ class AssemblyDatabaseManager: runtime = (self._runtime_root / domain.database_name).resolve() self._paths[domain] = AssemblyDatabasePaths(staging=staging, runtime=runtime) + @property + def staging_root(self) -> Path: + return self._staging_root + + @property + def runtime_root(self) -> Path: + return self._runtime_root + # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @@ -102,6 +103,18 @@ class AssemblyDatabaseManager: conn.close() self._mirror_database(domain) + def reset_domain(self, domain: AssemblyDomain) -> None: + """Remove all assemblies and payload metadata for the specified domain.""" + + conn = self._open_connection(domain) + try: + cur = conn.cursor() + cur.execute("DELETE FROM assemblies") + conn.commit() + finally: + conn.close() + self._mirror_database(domain) + def load_all(self, domain: AssemblyDomain) -> List[AssemblyRecord]: """Load all assembly records for the given domain.""" @@ -111,29 +124,26 @@ class AssemblyDatabaseManager: cur.execute( """ SELECT - a.assembly_id AS assembly_id, - a.display_name AS display_name, - a.summary AS summary, - a.category AS category, - a.assembly_kind AS assembly_kind, - a.assembly_type AS assembly_type, - a.version AS version, - a.payload_guid AS payload_guid, - a.metadata_json AS metadata_json, - a.tags_json AS tags_json, - a.checksum AS assembly_checksum, - a.created_at AS assembly_created_at, - a.updated_at AS assembly_updated_at, - p.payload_guid AS payload_guid, - p.payload_type AS payload_type, - p.file_name AS payload_file_name, - p.file_extension AS payload_file_extension, - p.size_bytes AS payload_size_bytes, - p.checksum AS payload_checksum, - p.created_at AS payload_created_at, - p.updated_at AS payload_updated_at - FROM assemblies AS a - JOIN payloads AS p ON p.payload_guid = a.payload_guid + assembly_guid, + display_name, + summary, + category, + assembly_kind, + assembly_type, + version, + metadata_json, + tags_json, + checksum AS assembly_checksum, + payload_type, + payload_file_name, + payload_file_extension, + payload_size_bytes, + payload_checksum, + payload_created_at, + payload_updated_at, + created_at AS assembly_created_at, + updated_at AS assembly_updated_at + FROM assemblies """ ) records: List[AssemblyRecord] = [] @@ -144,7 +154,7 @@ class AssemblyDatabaseManager: except Exception: payload_type = PayloadType.UNKNOWN payload = PayloadDescriptor( - guid=row["payload_guid"], + assembly_guid=row["assembly_guid"], payload_type=payload_type, file_name=row["payload_file_name"], file_extension=row["payload_file_extension"], @@ -164,7 +174,7 @@ class AssemblyDatabaseManager: except Exception: tags = {} record = AssemblyRecord( - assembly_id=row["assembly_id"], + assembly_guid=row["assembly_guid"], display_name=row["display_name"], summary=row["summary"], category=row["category"], @@ -191,20 +201,62 @@ class AssemblyDatabaseManager: try: cur = conn.cursor() payload = record.payload + metadata_json = json.dumps(record.metadata or {}) + tags_json = json.dumps(record.tags or {}) cur.execute( """ - INSERT INTO payloads (payload_guid, payload_type, file_name, file_extension, size_bytes, checksum, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(payload_guid) DO UPDATE SET - payload_type = excluded.payload_type, - file_name = excluded.file_name, - file_extension = excluded.file_extension, - size_bytes = excluded.size_bytes, + INSERT INTO assemblies ( + assembly_guid, + display_name, + summary, + category, + assembly_kind, + assembly_type, + version, + metadata_json, + tags_json, + checksum, + payload_type, + payload_file_name, + payload_file_extension, + payload_size_bytes, + payload_checksum, + payload_created_at, + payload_updated_at, + created_at, + updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(assembly_guid) DO UPDATE SET + display_name = excluded.display_name, + summary = excluded.summary, + category = excluded.category, + assembly_kind = excluded.assembly_kind, + assembly_type = excluded.assembly_type, + version = excluded.version, + metadata_json = excluded.metadata_json, + tags_json = excluded.tags_json, checksum = excluded.checksum, + payload_type = excluded.payload_type, + payload_file_name = excluded.payload_file_name, + payload_file_extension = excluded.payload_file_extension, + payload_size_bytes = excluded.payload_size_bytes, + payload_checksum = excluded.payload_checksum, + payload_created_at = excluded.payload_created_at, + payload_updated_at = excluded.payload_updated_at, updated_at = excluded.updated_at """, ( - payload.guid, + record.assembly_guid, + record.display_name, + record.summary, + record.category, + record.assembly_kind, + record.assembly_type, + record.version, + metadata_json, + tags_json, + record.checksum, payload.payload_type.value, payload.file_name, payload.file_extension, @@ -212,53 +264,6 @@ class AssemblyDatabaseManager: payload.checksum, payload.created_at.isoformat(), payload.updated_at.isoformat(), - ), - ) - metadata_json = json.dumps(record.metadata or {}) - tags_json = json.dumps(record.tags or {}) - cur.execute( - """ - INSERT INTO assemblies ( - assembly_id, - display_name, - summary, - category, - assembly_kind, - assembly_type, - version, - payload_guid, - metadata_json, - tags_json, - checksum, - created_at, - updated_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(assembly_id) DO UPDATE SET - display_name = excluded.display_name, - summary = excluded.summary, - category = excluded.category, - assembly_kind = excluded.assembly_kind, - assembly_type = excluded.assembly_type, - version = excluded.version, - payload_guid = excluded.payload_guid, - metadata_json = excluded.metadata_json, - tags_json = excluded.tags_json, - checksum = excluded.checksum, - updated_at = excluded.updated_at - """, - ( - record.assembly_id, - record.display_name, - record.summary, - record.category, - record.assembly_kind, - record.assembly_type, - record.version, - payload.guid, - metadata_json, - tags_json, - record.checksum, record.created_at.isoformat(), record.updated_at.isoformat(), ), @@ -275,7 +280,7 @@ class AssemblyDatabaseManager: conn = self._open_connection(domain) try: cur = conn.cursor() - cur.execute("DELETE FROM assemblies WHERE assembly_id = ?", (record.assembly_id,)) + cur.execute("DELETE FROM assemblies WHERE assembly_guid = ?", (record.assembly_guid,)) conn.commit() finally: conn.close() @@ -326,4 +331,3 @@ class AssemblyDatabaseManager: runtime_candidate, exc, ) - diff --git a/Data/Engine/assembly_management/models.py b/Data/Engine/assembly_management/models.py index 467a16e1..e3929d9f 100644 --- a/Data/Engine/assembly_management/models.py +++ b/Data/Engine/assembly_management/models.py @@ -45,7 +45,7 @@ class PayloadType(str, Enum): class PayloadDescriptor: """Represents on-disk payload material referenced by an assembly.""" - guid: str + assembly_guid: str payload_type: PayloadType file_name: str file_extension: str @@ -56,7 +56,7 @@ class PayloadDescriptor: def as_dict(self) -> Dict[str, Any]: return { - "guid": self.guid, + "assembly_guid": self.assembly_guid, "payload_type": self.payload_type.value, "file_name": self.file_name, "file_extension": self.file_extension, @@ -66,12 +66,18 @@ class PayloadDescriptor: "updated_at": self.updated_at.isoformat(), } + @property + def guid(self) -> str: + """Backwards-compatible accessor for legacy references.""" + + return self.assembly_guid + @dataclass(slots=True) class AssemblyRecord: """Represents an assembly row hydrated from persistence.""" - assembly_id: str + assembly_guid: str display_name: str summary: Optional[str] category: Optional[str] @@ -106,4 +112,3 @@ class CachedAssembly: self.is_dirty = False self.last_persisted = now self.dirty_since = None - diff --git a/Data/Engine/assembly_management/payloads.py b/Data/Engine/assembly_management/payloads.py index 51ad81d4..acfb938d 100644 --- a/Data/Engine/assembly_management/payloads.py +++ b/Data/Engine/assembly_management/payloads.py @@ -38,12 +38,12 @@ class PayloadManager: payload_type: PayloadType, content: Union[str, bytes], *, - guid: Optional[str] = None, + assembly_guid: Optional[str] = None, extension: Optional[str] = None, ) -> PayloadDescriptor: """Persist payload content and mirror it to the runtime directory.""" - resolved_guid = self._normalise_guid(guid or uuid.uuid4().hex) + resolved_guid = self._normalise_guid(assembly_guid or uuid.uuid4().hex) resolved_extension = self._normalise_extension(extension or self._default_extension(payload_type)) now = _dt.datetime.utcnow() data = content.encode("utf-8") if isinstance(content, str) else bytes(content) @@ -67,7 +67,7 @@ class PayloadManager: self._logger.debug("Failed to mirror payload %s to runtime copy: %s", resolved_guid, exc) descriptor = PayloadDescriptor( - guid=resolved_guid, + assembly_guid=resolved_guid, payload_type=payload_type, file_name=file_name, file_extension=resolved_extension, @@ -85,8 +85,8 @@ class PayloadManager: checksum = hashlib.sha256(data).hexdigest() now = _dt.datetime.utcnow() - staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name - runtime_path = self._payload_dir(self._runtime_root, descriptor.guid) / descriptor.file_name + staging_path = self._payload_dir(self._staging_root, descriptor.assembly_guid) / descriptor.file_name + runtime_path = self._payload_dir(self._runtime_root, descriptor.assembly_guid) / descriptor.file_name staging_path.parent.mkdir(parents=True, exist_ok=True) runtime_path.parent.mkdir(parents=True, exist_ok=True) @@ -96,7 +96,7 @@ class PayloadManager: try: shutil.copy2(staging_path, runtime_path) except Exception as exc: # pragma: no cover - best effort mirror - self._logger.debug("Failed to mirror payload %s during update: %s", descriptor.guid, exc) + self._logger.debug("Failed to mirror payload %s during update: %s", descriptor.assembly_guid, exc) descriptor.size_bytes = len(data) descriptor.checksum = checksum @@ -106,28 +106,28 @@ class PayloadManager: def read_payload_bytes(self, descriptor: PayloadDescriptor) -> bytes: """Retrieve payload content from the staging copy.""" - staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name + staging_path = self._payload_dir(self._staging_root, descriptor.assembly_guid) / descriptor.file_name return staging_path.read_bytes() def ensure_runtime_copy(self, descriptor: PayloadDescriptor) -> None: """Ensure the runtime payload copy matches the staging content.""" - staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name - runtime_path = self._payload_dir(self._runtime_root, descriptor.guid) / descriptor.file_name + staging_path = self._payload_dir(self._staging_root, descriptor.assembly_guid) / descriptor.file_name + runtime_path = self._payload_dir(self._runtime_root, descriptor.assembly_guid) / descriptor.file_name if not staging_path.exists(): - self._logger.warning("Payload missing on disk; guid=%s path=%s", descriptor.guid, staging_path) + self._logger.warning("Payload missing on disk; guid=%s path=%s", descriptor.assembly_guid, staging_path) return runtime_path.parent.mkdir(parents=True, exist_ok=True) try: shutil.copy2(staging_path, runtime_path) except Exception as exc: # pragma: no cover - best effort mirror - self._logger.debug("Failed to mirror payload %s via ensure_runtime_copy: %s", descriptor.guid, exc) + self._logger.debug("Failed to mirror payload %s via ensure_runtime_copy: %s", descriptor.assembly_guid, exc) def delete_payload(self, descriptor: PayloadDescriptor) -> None: """Remove staging and runtime payload files.""" for root in (self._staging_root, self._runtime_root): - dir_path = self._payload_dir(root, descriptor.guid) + dir_path = self._payload_dir(root, descriptor.assembly_guid) file_path = dir_path / descriptor.file_name try: if file_path.exists(): @@ -135,7 +135,9 @@ class PayloadManager: if dir_path.exists() and not any(dir_path.iterdir()): dir_path.rmdir() except Exception as exc: # pragma: no cover - best effort cleanup - self._logger.debug("Failed to remove payload directory %s (%s): %s", descriptor.guid, root, exc) + self._logger.debug( + "Failed to remove payload directory %s (%s): %s", descriptor.assembly_guid, root, exc + ) # ------------------------------------------------------------------ # Helper methods @@ -163,4 +165,3 @@ class PayloadManager: @staticmethod def _normalise_guid(guid: str) -> str: return guid.strip().lower() - diff --git a/Data/Engine/assembly_management/sync.py b/Data/Engine/assembly_management/sync.py new file mode 100644 index 00000000..3b888726 --- /dev/null +++ b/Data/Engine/assembly_management/sync.py @@ -0,0 +1,215 @@ +# ====================================================== +# Data\Engine\assembly_management\sync.py +# Description: Synchronises assembly databases from staged filesystem sources (official domain importer). +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Synchronisation helpers for assembly persistence domains.""" + +from __future__ import annotations + +import datetime as _dt +import hashlib +import json +import logging +from pathlib import Path +from typing import Iterable, Optional, Tuple + +from .databases import AssemblyDatabaseManager +from .models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadType +from .payloads import PayloadManager + + +_SCRIPT_DIRS = {"scripts", "script"} +_WORKFLOW_DIRS = {"workflows", "workflow"} +_ANSIBLE_DIRS = {"ansible_playbooks", "ansible-playbooks", "ansible"} + + +def sync_official_domain( + db_manager: AssemblyDatabaseManager, + payload_manager: PayloadManager, + staging_root: Path, + *, + logger: Optional[logging.Logger] = None, +) -> None: + """Repopulate the official domain database from staged JSON assemblies.""" + + log = logger or logging.getLogger(__name__) + root = staging_root.resolve() + if not root.is_dir(): + log.warning("Assembly staging root missing during official sync: %s", root) + return + + files = tuple(_iter_assembly_sources(root)) + if not files: + log.info("No staged assemblies discovered for official sync; clearing domain.") + db_manager.reset_domain(AssemblyDomain.OFFICIAL) + return + + db_manager.reset_domain(AssemblyDomain.OFFICIAL) + + imported = 0 + skipped = 0 + + for rel_path, file_path in files: + record = _record_from_file(rel_path, file_path, payload_manager, log) + if record is None: + skipped += 1 + continue + entry = CachedAssembly( + domain=AssemblyDomain.OFFICIAL, + record=record, + is_dirty=False, + last_persisted=record.updated_at, + ) + try: + db_manager.upsert_record(AssemblyDomain.OFFICIAL, entry) + imported += 1 + except Exception: # pragma: no cover - defensive logging + skipped += 1 + log.exception("Failed to import assembly %s during official sync.", rel_path) + + log.info( + "Official assembly sync complete: imported=%s skipped=%s source_root=%s", + imported, + skipped, + root, + ) + + +def _iter_assembly_sources(root: Path) -> Iterable[Tuple[str, Path]]: + for path in root.rglob("*.json"): + if not path.is_file(): + continue + rel_path = path.relative_to(root).as_posix() + yield rel_path, path + + +def _record_from_file( + rel_path: str, + file_path: Path, + payload_manager: PayloadManager, + logger: logging.Logger, +) -> Optional[AssemblyRecord]: + try: + text = file_path.read_text(encoding="utf-8") + except Exception as exc: + logger.warning("Unable to read assembly source %s: %s", file_path, exc) + return None + + try: + document = json.loads(text) + except Exception as exc: + logger.warning("Invalid JSON for assembly %s: %s", file_path, exc) + return None + + kind = _infer_kind(rel_path) + if kind == "unknown": + logger.debug("Skipping non-assembly file %s", rel_path) + return None + + payload_type = _payload_type_for_kind(kind) + guid = hashlib.sha1(rel_path.encode("utf-8")).hexdigest() + descriptor = payload_manager.store_payload(payload_type, text, assembly_guid=guid, extension=".json") + + file_stat = file_path.stat() + timestamp = _dt.datetime.utcfromtimestamp(file_stat.st_mtime).replace(microsecond=0) + descriptor.created_at = timestamp + descriptor.updated_at = timestamp + + assembly_id = _assembly_id_from_path(rel_path) + document_metadata = _metadata_from_document(kind, document, rel_path) + tags = _coerce_dict(document.get("tags")) + + record = AssemblyRecord( + assembly_id=assembly_id, + display_name=document_metadata.get("display_name") or assembly_id.rsplit("/", 1)[-1], + summary=document_metadata.get("summary"), + category=document_metadata.get("category"), + assembly_kind=kind, + assembly_type=document_metadata.get("assembly_type"), + version=_coerce_int(document.get("version"), default=1), + payload=descriptor, + metadata=document_metadata, + tags=tags, + checksum=hashlib.sha256(text.encode("utf-8")).hexdigest(), + created_at=timestamp, + updated_at=timestamp, + ) + return record + + +def _infer_kind(rel_path: str) -> str: + first = rel_path.split("/", 1)[0].strip().lower() + if first in _SCRIPT_DIRS: + return "script" + if first in _WORKFLOW_DIRS: + return "workflow" + if first in _ANSIBLE_DIRS: + return "ansible" + return "unknown" + + +def _payload_type_for_kind(kind: str) -> PayloadType: + if kind == "workflow": + return PayloadType.WORKFLOW + if kind == "script": + return PayloadType.SCRIPT + if kind == "ansible": + return PayloadType.BINARY + return PayloadType.UNKNOWN + + +def _assembly_id_from_path(rel_path: str) -> str: + normalised = rel_path.replace("\\", "/") + if normalised.lower().endswith(".json"): + normalised = normalised[:-5] + return normalised + + +def _metadata_from_document(kind: str, document: dict, rel_path: str) -> dict: + metadata = { + "source_path": rel_path, + "display_name": None, + "summary": None, + "category": None, + "assembly_type": None, + } + + if kind == "workflow": + metadata["display_name"] = document.get("tab_name") or document.get("name") + metadata["summary"] = document.get("description") + metadata["category"] = "workflow" + metadata["assembly_type"] = "workflow" + elif kind == "script": + metadata["display_name"] = document.get("name") or document.get("display_name") + metadata["summary"] = document.get("description") + metadata["category"] = (document.get("category") or "script").lower() + metadata["assembly_type"] = (document.get("type") or "powershell").lower() + elif kind == "ansible": + metadata["display_name"] = document.get("name") or document.get("display_name") or rel_path.rsplit("/", 1)[-1] + metadata["summary"] = document.get("description") + metadata["category"] = "ansible" + metadata["assembly_type"] = "ansible" + + metadata.update( + { + "sites": document.get("sites"), + "variables": document.get("variables"), + "files": document.get("files"), + } + ) + metadata["display_name"] = metadata.get("display_name") or rel_path.rsplit("/", 1)[-1] + return metadata + + +def _coerce_int(value, *, default: int = 0) -> int: + try: + return int(value) + except Exception: + return default + + +def _coerce_dict(value) -> dict: + return value if isinstance(value, dict) else {} diff --git a/Data/Engine/services/API/assemblies/management.py b/Data/Engine/services/API/assemblies/management.py index f839b42a..f9c9ad92 100644 --- a/Data/Engine/services/API/assemblies/management.py +++ b/Data/Engine/services/API/assemblies/management.py @@ -1,732 +1,303 @@ # ====================================================== # Data\Engine\services\API\assemblies\management.py -# Description: Assembly CRUD endpoints for workflows, scripts, and Ansible documents during the Engine migration. +# Description: Assembly REST API routes backed by AssemblyCache for multi-domain persistence. # # API Endpoints (if applicable): -# - POST /api/assembly/create (Token Authenticated) - Creates a folder or assembly file within the requested island. -# - POST /api/assembly/edit (Token Authenticated) - Replaces the contents of an existing assembly. -# - POST /api/assembly/rename (Token Authenticated) - Renames an assembly file or folder. -# - POST /api/assembly/move (Token Authenticated) - Moves an assembly file or folder to a new location. -# - POST /api/assembly/delete (Token Authenticated) - Deletes an assembly file or folder. -# - GET /api/assembly/list (Token Authenticated) - Lists assemblies and folders for a given island. -# - GET /api/assembly/load (Token Authenticated) - Loads an assembly file and returns normalized metadata. +# - GET /api/assemblies (Token Authenticated) - Lists assemblies with domain/source metadata. +# - GET /api/assemblies/ (Token Authenticated) - Returns assembly metadata and payload reference. +# - POST /api/assemblies (Token Authenticated) - Creates a new assembly within the allowed domain. +# - PUT /api/assemblies/ (Token Authenticated) - Updates an existing assembly and stages persistence. +# - DELETE /api/assemblies/ (Token Authenticated) - Marks an assembly for deletion. +# - POST /api/assemblies//clone (Token Authenticated (Admin+Dev Mode for non-user domains)) - Clones an assembly into a target domain. +# - POST /api/assemblies/dev-mode/switch (Token Authenticated (Admin)) - Enables or disables Dev Mode overrides for the current session. +# - POST /api/assemblies/dev-mode/write (Token Authenticated (Admin+Dev Mode)) - Flushes queued assembly writes immediately. +# - POST /api/assemblies/official/sync (Token Authenticated (Admin+Dev Mode)) - Rebuilds the official domain from staged JSON assemblies. # ====================================================== -"""Assembly management endpoints for the Borealis Engine API.""" +"""Assembly CRUD REST endpoints backed by AssemblyCache.""" + from __future__ import annotations -import base64 -import json import logging import os -import re -import shutil -import time -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Mapping, MutableMapping, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple -from flask import Blueprint, jsonify, request +from flask import Blueprint, jsonify, request, session +from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer -from . import execution as assemblies_execution +from Data.Engine.assembly_management.models import AssemblyDomain +from ..assemblies.service import AssemblyRuntimeService if TYPE_CHECKING: # pragma: no cover - typing aide from .. import EngineServiceAdapters -_ISLAND_DIR_MAP: Mapping[str, str] = { - "workflows": "Workflows", - "workflow": "Workflows", - "scripts": "Scripts", - "script": "Scripts", - "ansible": "Ansible_Playbooks", - "ansible_playbooks": "Ansible_Playbooks", - "ansible-playbooks": "Ansible_Playbooks", - "playbooks": "Ansible_Playbooks", -} +class AssemblyAPIService: + """Facilitates assembly API routes with authentication and permission checks.""" -_BASE64_CLEANER = re.compile(r"\s+") - - - -class AssemblyManagementService: - """Implements assembly CRUD helpers for Engine routes.""" - - def __init__(self, adapters: "EngineServiceAdapters") -> None: + def __init__(self, app, adapters: "EngineServiceAdapters") -> None: + self.app = app self.adapters = adapters self.logger = adapters.context.logger or logging.getLogger(__name__) - self.service_log = adapters.service_log - self._base_root = self._discover_assemblies_root() - self._log_action("init", f"assemblies root set to {self._base_root}") - - def _discover_assemblies_root(self) -> Path: - module_path = Path(__file__).resolve() - for candidate in (module_path, *module_path.parents): - engine_dir = candidate / "Engine" - assemblies_dir = engine_dir / "Assemblies" - if assemblies_dir.is_dir(): - return assemblies_dir.resolve() - - raise RuntimeError("Engine assemblies directory not found; expected /Engine/Assemblies.") + cache = adapters.context.assembly_cache + if cache is None: + raise RuntimeError("Assembly cache not initialised; ensure Engine bootstrap executed.") + self.runtime = AssemblyRuntimeService(cache, logger=self.logger) # ------------------------------------------------------------------ - # Path helpers + # Authentication helpers # ------------------------------------------------------------------ - def _normalize_relpath(self, value: str) -> str: - return (value or "").replace("\\", "/").strip("/") + def require_user(self) -> Tuple[Optional[Dict[str, Any]], Optional[Tuple[Dict[str, Any], int]]]: + user = self._current_user() + if not user: + return None, ({"error": "unauthorized"}, 401) + return user, None - def _resolve_island_root(self, island: str) -> Optional[str]: - subdir = _ISLAND_DIR_MAP.get((island or "").strip().lower()) - if not subdir: + def require_admin(self, *, dev_mode_required: bool = False) -> Optional[Tuple[Dict[str, Any], int]]: + user = self._current_user() + if not user: + return {"error": "unauthorized"}, 401 + if not self._is_admin(user): + return {"error": "admin required"}, 403 + if dev_mode_required and not self._dev_mode_enabled(): + return {"error": "dev mode required"}, 403 + return None + + def require_mutation_for_domain(self, domain: AssemblyDomain) -> Optional[Tuple[Dict[str, Any], int]]: + user, error = self.require_user() + if error: + return error + if domain == AssemblyDomain.USER: return None - root = (self._base_root / subdir).resolve() - return str(root) + if not self._is_admin(user): + return {"error": "admin required for non-user domains"}, 403 + if not self._dev_mode_enabled(): + return {"error": "dev mode required for privileged domains"}, 403 + return None - def _resolve_assembly_path(self, island: str, rel_path: str) -> Tuple[str, str, str]: - root = self._resolve_island_root(island) - if not root: - raise ValueError("invalid island") - rel_norm = self._normalize_relpath(rel_path) - abs_path = os.path.abspath(os.path.join(root, rel_norm)) - if not abs_path.startswith(root): - raise ValueError("invalid path") - return root, abs_path, rel_norm + def _token_serializer(self) -> URLSafeTimedSerializer: + secret = self.app.secret_key or "borealis-dev-secret" + return URLSafeTimedSerializer(secret, salt="borealis-auth") - # ------------------------------------------------------------------ - # Document helpers - # ------------------------------------------------------------------ - def _default_ext_for_island(self, island: str, item_type: str = "") -> str: - isl = (island or "").lower().strip() - if isl in ("workflows", "workflow"): - return ".json" - if isl in ("ansible", "ansible_playbooks", "ansible-playbooks", "playbooks"): - return ".json" - if isl in ("scripts", "script"): - return ".json" - typ = (item_type or "").lower().strip() - if typ in ("bash", "batch", "powershell"): - return ".json" - return ".json" + def _current_user(self) -> Optional[Dict[str, Any]]: + username = session.get("username") + role = session.get("role") or "User" + if username: + return {"username": username, "role": role} - def _default_type_for_island(self, island: str, item_type: str = "") -> str: - isl = (island or "").lower().strip() - if isl in ("ansible", "ansible_playbooks", "ansible-playbooks", "playbooks"): - return "ansible" - typ = (item_type or "").lower().strip() - if typ in ("powershell", "batch", "bash", "ansible"): - return typ - return "powershell" - - def _empty_document(self, default_type: str = "powershell") -> Dict[str, Any]: - return { - "version": 1, - "name": "", - "description": "", - "category": "application" if (default_type or "").lower() == "ansible" else "script", - "type": default_type or "powershell", - "script": "", - "timeout_seconds": 3600, - "sites": {"mode": "all", "values": []}, - "variables": [], - "files": [], - } - - def _decode_base64_text(self, value: Any) -> Optional[str]: - if not isinstance(value, str): + token = self._bearer_token() + if not token: return None - stripped = value.strip() - if not stripped: - return "" + max_age = int(os.environ.get("BOREALIS_TOKEN_TTL_SECONDS", 60 * 60 * 24 * 30)) try: - cleaned = _BASE64_CLEANER.sub("", stripped) - except Exception: - cleaned = stripped - try: - decoded = base64.b64decode(cleaned, validate=True) - except Exception: + data = self._token_serializer().loads(token, max_age=max_age) + username = data.get("u") + role = data.get("r") or "User" + if username: + return {"username": username, "role": role} + except (BadSignature, SignatureExpired, Exception): return None - try: - return decoded.decode("utf-8") - except Exception: - return decoded.decode("utf-8", errors="replace") + return None - def _decode_script_content(self, value: Any, encoding_hint: str = "") -> str: - encoding = (encoding_hint or "").strip().lower() - if isinstance(value, str): - if encoding in ("base64", "b64", "base-64"): - decoded = self._decode_base64_text(value) - if decoded is not None: - return decoded.replace("\r\n", "\n") - decoded = self._decode_base64_text(value) - if decoded is not None: - return decoded.replace("\r\n", "\n") - return value.replace("\r\n", "\n") - return "" + def _bearer_token(self) -> Optional[str]: + auth_header = request.headers.get("Authorization") or "" + if auth_header.lower().startswith("bearer "): + return auth_header.split(" ", 1)[1].strip() + cookie_token = request.cookies.get("borealis_auth") + if cookie_token: + return cookie_token + return None - def _encode_script_content(self, script_text: Any) -> str: - if not isinstance(script_text, str): - script_text = "" if script_text is None else str(script_text) - normalized = script_text.replace("\r\n", "\n") - if not normalized: - return "" - encoded = base64.b64encode(normalized.encode("utf-8")) - return encoded.decode("ascii") + @staticmethod + def _is_admin(user: Dict[str, Any]) -> bool: + role = (user.get("role") or "").strip().lower() + return role == "admin" - def _prepare_storage(self, doc: Dict[str, Any]) -> Dict[str, Any]: - stored: Dict[str, Any] = {} - for key, value in (doc or {}).items(): - if key == "script": - stored[key] = self._encode_script_content(value) - else: - stored[key] = value - stored["script_encoding"] = "base64" - return stored - - def _normalize_document(self, obj: Any, default_type: str, base_name: str) -> Dict[str, Any]: - doc = self._empty_document(default_type) - if not isinstance(obj, dict): - obj = {} - base = (base_name or "assembly").strip() - doc["name"] = str(obj.get("name") or obj.get("display_name") or base) - doc["description"] = str(obj.get("description") or "") - category = str(obj.get("category") or doc["category"]).strip().lower() - if category in ("script", "application"): - doc["category"] = category - typ = str(obj.get("type") or obj.get("script_type") or default_type or "powershell").strip().lower() - if typ in ("powershell", "batch", "bash", "ansible"): - doc["type"] = typ - script_val = obj.get("script") - content_val = obj.get("content") - script_lines = obj.get("script_lines") - if isinstance(script_lines, list): - try: - doc["script"] = "\n".join(str(line) for line in script_lines) - except Exception: - doc["script"] = "" - elif isinstance(script_val, str): - doc["script"] = script_val - elif isinstance(content_val, str): - doc["script"] = content_val - else: - doc["script"] = "" - - encoding_hint = str(obj.get("script_encoding") or obj.get("scriptEncoding") or "").strip().lower() - doc["script"] = self._decode_script_content(doc.get("script"), encoding_hint) - if encoding_hint in ("base64", "b64", "base-64"): - doc["script_encoding"] = "base64" - else: - doc["script_encoding"] = "plain" - - timeout = obj.get("timeout_seconds") - if isinstance(timeout, (int, float)) and timeout > 0: - doc["timeout_seconds"] = int(timeout) - - sites = obj.get("sites") - if isinstance(sites, dict): - mode = str(sites.get("mode") or doc["sites"]["mode"]).strip().lower() - if mode in ("all", "include", "exclude"): - doc["sites"]["mode"] = mode - values = sites.get("values") - if isinstance(values, list): - doc["sites"]["values"] = [str(v) for v in values if str(v).strip()] - - variables = obj.get("variables") or obj.get("variable_definitions") - if isinstance(variables, list): - normalized_vars: List[Dict[str, Any]] = [] - for entry in variables: - if not isinstance(entry, dict): - continue - normalized_vars.append( - { - "name": str(entry.get("name") or entry.get("variable") or "").strip(), - "label": str(entry.get("label") or "").strip(), - "description": str(entry.get("description") or "").strip(), - "type": str(entry.get("type") or "string").strip().lower() or "string", - "default": entry.get("default"), - "required": bool(entry.get("required")), - } - ) - doc["variables"] = normalized_vars - - files = obj.get("files") - if isinstance(files, list): - normalized_files: List[Dict[str, Any]] = [] - for entry in files: - if not isinstance(entry, dict): - continue - normalized_files.append( - { - "file_name": str(entry.get("file_name") or entry.get("name") or "").strip(), - "content": entry.get("content") or "", - } - ) - doc["files"] = normalized_files - - return doc - - def _safe_read_json(self, path: str) -> Dict[str, Any]: - try: - with open(path, "r", encoding="utf-8") as handle: - return json.load(handle) - except Exception: - return {} - - def _extract_tab_name(self, obj: Mapping[str, Any]) -> str: - if not isinstance(obj, Mapping): - return "" - for key in ("tabName", "tab_name", "name", "title"): - val = obj.get(key) - if isinstance(val, str) and val.strip(): - return val.strip() - return "" - - def _detect_script_type(self, filename: str) -> str: - lower = (filename or "").lower() - if lower.endswith(".json") and os.path.isfile(filename): - try: - obj = self._safe_read_json(filename) - if isinstance(obj, dict): - typ = str(obj.get("type") or obj.get("script_type") or "").strip().lower() - if typ in ("powershell", "batch", "bash", "ansible"): - return typ - except Exception: - pass - return "powershell" - if lower.endswith(".yml"): - return "ansible" - if lower.endswith(".ps1"): - return "powershell" - if lower.endswith(".bat"): - return "batch" - if lower.endswith(".sh"): - return "bash" - return "unknown" - - def _load_assembly_document(self, abs_path: str, island: str, type_hint: str = "") -> Dict[str, Any]: - base_name = os.path.splitext(os.path.basename(abs_path))[0] - default_type = self._default_type_for_island(island, type_hint) - if abs_path.lower().endswith(".json"): - data = self._safe_read_json(abs_path) - return self._normalize_document(data, default_type, base_name) - try: - with open(abs_path, "r", encoding="utf-8", errors="replace") as handle: - content = handle.read() - except Exception: - content = "" - doc = self._empty_document(default_type) - doc["name"] = base_name - doc["script"] = (content or "").replace("\r\n", "\n") - if default_type == "ansible": - doc["category"] = "application" - return doc - - def _log_action(self, action: str, message: str) -> None: - try: - self.service_log("assemblies", f"{action}: {message}") - except Exception: - self.logger.debug("Failed to record assemblies log entry for %s", action, exc_info=True) + def _dev_mode_enabled(self) -> bool: + return bool(session.get("assemblies_dev_mode", False)) + def set_dev_mode(self, enabled: bool) -> None: + session["assemblies_dev_mode"] = bool(enabled) # ------------------------------------------------------------------ - # CRUD operations + # Domain helpers # ------------------------------------------------------------------ - def create(self, payload: Mapping[str, Any]) -> Tuple[MutableMapping[str, Any], int]: - island = (payload.get("island") or "").strip() - kind = (payload.get("kind") or "").strip().lower() - path_value = (payload.get("path") or "").strip() - content_value = payload.get("content") - item_type = (payload.get("type") or "").strip().lower() - try: - root, abs_path, rel_norm = self._resolve_assembly_path(island, path_value) - if not rel_norm: - return {"error": "path required"}, 400 - if kind == "folder": - os.makedirs(abs_path, exist_ok=True) - self._log_action("create-folder", f"island={island} rel_path={rel_norm}") - return {"status": "ok"}, 200 - - if kind != "file": - return {"error": "invalid kind"}, 400 - - base, ext = os.path.splitext(abs_path) - if not ext: - abs_path = base + self._default_ext_for_island(island, item_type) - os.makedirs(os.path.dirname(abs_path), exist_ok=True) - isl = (island or "").lower() - if isl in ("workflows", "workflow"): - obj = self._coerce_workflow_dict(content_value) - base_name = os.path.splitext(os.path.basename(abs_path))[0] - obj.setdefault("tab_name", base_name) - with open(abs_path, "w", encoding="utf-8") as handle: - json.dump(obj, handle, indent=2) - else: - obj = self._coerce_generic_dict(content_value) - base_name = os.path.splitext(os.path.basename(abs_path))[0] - normalized = self._normalize_document(obj, self._default_type_for_island(island, item_type), base_name) - with open(abs_path, "w", encoding="utf-8") as handle: - json.dump(self._prepare_storage(normalized), handle, indent=2) - rel_new = os.path.relpath(abs_path, root).replace(os.sep, "/") - self._log_action("create-file", f"island={island} rel_path={rel_new}") - return {"status": "ok", "rel_path": rel_new}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - defensive logging - self.logger.exception("Failed to create assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - def edit(self, payload: Mapping[str, Any]) -> Tuple[MutableMapping[str, Any], int]: - island = (payload.get("island") or "").strip() - path_value = (payload.get("path") or "").strip() - content_value = payload.get("content") - data_type = (payload.get("type") or "").strip() - try: - root, abs_path, _ = self._resolve_assembly_path(island, path_value) - if not os.path.isfile(abs_path): - return {"error": "file not found"}, 404 - target_abs = abs_path - if not abs_path.lower().endswith(".json"): - base, _ = os.path.splitext(abs_path) - target_abs = base + self._default_ext_for_island(island, data_type) - - isl = (island or "").lower() - if isl in ("workflows", "workflow"): - obj = self._coerce_workflow_dict(content_value, strict=True) - with open(target_abs, "w", encoding="utf-8") as handle: - json.dump(obj, handle, indent=2) - else: - obj = self._coerce_generic_dict(content_value) - base_name = os.path.splitext(os.path.basename(target_abs))[0] - normalized = self._normalize_document( - obj, - self._default_type_for_island(island, obj.get("type") if isinstance(obj, dict) else ""), - base_name, - ) - with open(target_abs, "w", encoding="utf-8") as handle: - json.dump(self._prepare_storage(normalized), handle, indent=2) - - if target_abs != abs_path: - try: - os.remove(abs_path) - except Exception: - pass - - rel_new = os.path.relpath(target_abs, root).replace(os.sep, "/") - self._log_action("edit", f"island={island} rel_path={rel_new}") - return {"status": "ok", "rel_path": rel_new}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to edit assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - def rename(self, payload: Mapping[str, Any]) -> Tuple[MutableMapping[str, Any], int]: - island = (payload.get("island") or "").strip() - kind = (payload.get("kind") or "").strip().lower() - path_value = (payload.get("path") or "").strip() - new_name = (payload.get("new_name") or "").strip() - item_type = (payload.get("type") or "").strip().lower() - if not new_name: - return {"error": "new_name required"}, 400 - try: - root, old_abs, _ = self._resolve_assembly_path(island, path_value) - if kind == "folder": - if not os.path.isdir(old_abs): - return {"error": "folder not found"}, 404 - new_abs = os.path.join(os.path.dirname(old_abs), new_name) - elif kind == "file": - if not os.path.isfile(old_abs): - return {"error": "file not found"}, 404 - base, ext = os.path.splitext(new_name) - if not ext: - new_name = base + self._default_ext_for_island(island, item_type) - new_abs = os.path.join(os.path.dirname(old_abs), os.path.basename(new_name)) - else: - return {"error": "invalid kind"}, 400 - - new_abs_norm = os.path.abspath(new_abs) - if not new_abs_norm.startswith(root): - return {"error": "invalid destination"}, 400 - - os.rename(old_abs, new_abs_norm) - - isl = (island or "").lower() - if kind == "file" and isl in ("workflows", "workflow"): - try: - obj = self._safe_read_json(new_abs_norm) - base_name = os.path.splitext(os.path.basename(new_abs_norm))[0] - for key in ("tabName", "tab_name", "name", "title"): - if key in obj: - obj[key] = base_name - obj.setdefault("tab_name", base_name) - with open(new_abs_norm, "w", encoding="utf-8") as handle: - json.dump(obj, handle, indent=2) - except Exception: - self.logger.debug("Failed to normalize workflow metadata after rename", exc_info=True) - - rel_new = os.path.relpath(new_abs_norm, root).replace(os.sep, "/") - self._log_action("rename", f"island={island} from={path_value} to={rel_new}") - return {"status": "ok", "rel_path": rel_new}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to rename assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - def move(self, payload: Mapping[str, Any]) -> Tuple[MutableMapping[str, Any], int]: - island = (payload.get("island") or "").strip() - path_value = (payload.get("path") or "").strip() - new_path = (payload.get("new_path") or "").strip() - kind = (payload.get("kind") or "").strip().lower() - try: - _, old_abs, _ = self._resolve_assembly_path(island, path_value) - root, new_abs, _ = self._resolve_assembly_path(island, new_path) - if kind == "folder": - if not os.path.isdir(old_abs): - return {"error": "folder not found"}, 404 - else: - if not os.path.isfile(old_abs): - return {"error": "file not found"}, 404 - os.makedirs(os.path.dirname(new_abs), exist_ok=True) - shutil.move(old_abs, new_abs) - rel_new = os.path.relpath(new_abs, root).replace(os.sep, "/") - self._log_action("move", f"island={island} from={path_value} to={rel_new}") - return {"status": "ok", "rel_path": rel_new}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to move assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - def delete(self, payload: Mapping[str, Any]) -> Tuple[MutableMapping[str, Any], int]: - island = (payload.get("island") or "").strip() - kind = (payload.get("kind") or "").strip().lower() - path_value = (payload.get("path") or "").strip() - try: - root, abs_path, rel_norm = self._resolve_assembly_path(island, path_value) - if not rel_norm: - return {"error": "cannot delete root"}, 400 - if kind == "folder": - if not os.path.isdir(abs_path): - return {"error": "folder not found"}, 404 - shutil.rmtree(abs_path) - elif kind == "file": - if not os.path.isfile(abs_path): - return {"error": "file not found"}, 404 - os.remove(abs_path) - else: - return {"error": "invalid kind"}, 400 - self._log_action("delete", f"island={island} rel_path={rel_norm} kind={kind}") - return {"status": "ok"}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to delete assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - def list_items(self, island: str) -> Tuple[MutableMapping[str, Any], int]: - island = (island or "").strip() - try: - root = self._resolve_island_root(island) - if not root: - return {"error": "invalid island"}, 400 - os.makedirs(root, exist_ok=True) - - items: List[Dict[str, Any]] = [] - folders: List[str] = [] - isl = island.lower() - - if isl in ("workflows", "workflow"): - exts = (".json",) - for dirpath, dirnames, filenames in os.walk(root): - rel_root = os.path.relpath(dirpath, root) - if rel_root != ".": - folders.append(rel_root.replace(os.sep, "/")) - for fname in filenames: - if not fname.lower().endswith(exts): - continue - fp = os.path.join(dirpath, fname) - rel_path = os.path.relpath(fp, root).replace(os.sep, "/") - try: - mtime = os.path.getmtime(fp) - except Exception: - mtime = 0.0 - obj = self._safe_read_json(fp) - tab = self._extract_tab_name(obj) - items.append( - { - "file_name": fname, - "rel_path": rel_path, - "type": "workflow", - "tab_name": tab, - "last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)), - "last_edited_epoch": mtime, - } - ) - elif isl in ("scripts", "script"): - exts = (".json", ".ps1", ".bat", ".sh") - for dirpath, dirnames, filenames in os.walk(root): - rel_root = os.path.relpath(dirpath, root) - if rel_root != ".": - folders.append(rel_root.replace(os.sep, "/")) - for fname in filenames: - if not fname.lower().endswith(exts): - continue - fp = os.path.join(dirpath, fname) - rel_path = os.path.relpath(fp, root).replace(os.sep, "/") - try: - mtime = os.path.getmtime(fp) - except Exception: - mtime = 0.0 - stype = self._detect_script_type(fp) - doc = self._load_assembly_document(fp, "scripts", stype) - items.append( - { - "file_name": fname, - "rel_path": rel_path, - "type": doc.get("type", stype), - "name": doc.get("name"), - "category": doc.get("category"), - "description": doc.get("description"), - "last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)), - "last_edited_epoch": mtime, - } - ) - else: - exts = (".json", ".yml") - for dirpath, dirnames, filenames in os.walk(root): - rel_root = os.path.relpath(dirpath, root) - if rel_root != ".": - folders.append(rel_root.replace(os.sep, "/")) - for fname in filenames: - if not fname.lower().endswith(exts): - continue - fp = os.path.join(dirpath, fname) - rel_path = os.path.relpath(fp, root).replace(os.sep, "/") - try: - mtime = os.path.getmtime(fp) - except Exception: - mtime = 0.0 - stype = self._detect_script_type(fp) - doc = self._load_assembly_document(fp, "ansible", stype) - items.append( - { - "file_name": fname, - "rel_path": rel_path, - "type": doc.get("type", "ansible"), - "name": doc.get("name"), - "category": doc.get("category"), - "description": doc.get("description"), - "last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)), - "last_edited_epoch": mtime, - } - ) - - items.sort(key=lambda row: row.get("last_edited_epoch", 0.0), reverse=True) - return {"root": root, "items": items, "folders": folders}, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to list assemblies", exc_info=exc) - return {"error": str(exc)}, 500 - - def load(self, island: str, rel_path: str) -> Tuple[MutableMapping[str, Any], int]: - island = (island or "").strip() - rel_path = (rel_path or "").strip() - try: - root, abs_path, _ = self._resolve_assembly_path(island, rel_path) - if not os.path.isfile(abs_path): - return {"error": "file not found"}, 404 - isl = island.lower() - if isl in ("workflows", "workflow"): - obj = self._safe_read_json(abs_path) - return obj, 200 - doc = self._load_assembly_document(abs_path, island) - rel = os.path.relpath(abs_path, root).replace(os.sep, "/") - result: Dict[str, Any] = { - "file_name": os.path.basename(abs_path), - "rel_path": rel, - "type": doc.get("type"), - "assembly": doc, - "content": doc.get("script"), - } - return result, 200 - except ValueError as err: - return {"error": str(err)}, 400 - except Exception as exc: # pragma: no cover - self.logger.exception("Failed to load assembly", exc_info=exc) - return {"error": str(exc)}, 500 - - - # ------------------------------------------------------------------ - # Content coercion - # ------------------------------------------------------------------ - def _coerce_generic_dict(self, value: Any) -> Dict[str, Any]: - obj = value - if isinstance(obj, str): - try: - obj = json.loads(obj) - except Exception: - obj = {} - if not isinstance(obj, dict): - obj = {} - return obj - - def _coerce_workflow_dict(self, value: Any, strict: bool = False) -> Dict[str, Any]: - obj = value - if isinstance(obj, str): - obj = json.loads(obj) - if not isinstance(obj, dict): - if strict: - raise ValueError("invalid content for workflow") - obj = {} - return obj + @staticmethod + def parse_domain(value: Any) -> Optional[AssemblyDomain]: + if value is None: + return None + candidate = str(value).strip().lower() + for domain in AssemblyDomain: + if domain.value == candidate: + return domain + return None def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: """Register assembly CRUD endpoints on the Flask app.""" - service = AssemblyManagementService(adapters) - blueprint = Blueprint("assemblies", __name__) + service = AssemblyAPIService(app, adapters) + blueprint = Blueprint("assemblies", __name__, url_prefix="/api/assemblies") - @blueprint.route("/api/assembly/create", methods=["POST"]) - def _create(): + # ------------------------------------------------------------------ + # Collections + # ------------------------------------------------------------------ + @blueprint.route("", methods=["GET"]) + def list_assemblies(): + _, error = service.require_user() + if error: + return jsonify(error[0]), error[1] + + domain = request.args.get("domain") + kind = request.args.get("kind") + items = service.runtime.list_assemblies(domain=domain, kind=kind) + queue_state = service.runtime.queue_snapshot() + return jsonify({"items": items, "queue": queue_state}), 200 + + # ------------------------------------------------------------------ + # Single assembly retrieval + # ------------------------------------------------------------------ + @blueprint.route("/", methods=["GET"]) + def get_assembly(assembly_guid: str): + _, error = service.require_user() + if error: + return jsonify(error[0]), error[1] + data = service.runtime.get_assembly(assembly_guid) + if not data: + return jsonify({"error": "not found"}), 404 + data["queue"] = service.runtime.queue_snapshot() + return jsonify(data), 200 + + # ------------------------------------------------------------------ + # Creation + # ------------------------------------------------------------------ + @blueprint.route("", methods=["POST"]) + def create_assembly(): payload = request.get_json(silent=True) or {} - response, status = service.create(payload) - return jsonify(response), status + domain = service.parse_domain(payload.get("domain")) + if domain is None: + return jsonify({"error": "invalid domain"}), 400 + error = service.require_mutation_for_domain(domain) + if error: + return jsonify(error[0]), error[1] + try: + record = service.runtime.create_assembly(payload) + return jsonify(record), 201 + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + except Exception: # pragma: no cover - runtime guard + service.logger.exception("Failed to create assembly.") + return jsonify({"error": "internal server error"}), 500 - @blueprint.route("/api/assembly/edit", methods=["POST"]) - def _edit(): + # ------------------------------------------------------------------ + # Update + # ------------------------------------------------------------------ + @blueprint.route("/", methods=["PUT"]) + def update_assembly(assembly_guid: str): payload = request.get_json(silent=True) or {} - response, status = service.edit(payload) - return jsonify(response), status + existing = service.runtime.get_cached_entry(assembly_guid) + if not existing: + return jsonify({"error": "not found"}), 404 + error = service.require_mutation_for_domain(existing.domain) + if error: + return jsonify(error[0]), error[1] + try: + record = service.runtime.update_assembly(assembly_guid, payload) + return jsonify(record), 200 + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + except Exception: # pragma: no cover - runtime guard + service.logger.exception("Failed to update assembly %s.", assembly_id) + return jsonify({"error": "internal server error"}), 500 - @blueprint.route("/api/assembly/rename", methods=["POST"]) - def _rename(): + # ------------------------------------------------------------------ + # Deletion + # ------------------------------------------------------------------ + @blueprint.route("/", methods=["DELETE"]) + def delete_assembly(assembly_guid: str): + existing = service.runtime.get_cached_entry(assembly_guid) + if not existing: + return jsonify({"error": "not found"}), 404 + error = service.require_mutation_for_domain(existing.domain) + if error: + return jsonify(error[0]), error[1] + try: + service.runtime.delete_assembly(assembly_guid) + return jsonify({"status": "queued"}), 202 + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + except Exception: # pragma: no cover + service.logger.exception("Failed to delete assembly %s.", assembly_id) + return jsonify({"error": "internal server error"}), 500 + + # ------------------------------------------------------------------ + # Clone between domains + # ------------------------------------------------------------------ + @blueprint.route("//clone", methods=["POST"]) + def clone_assembly(assembly_guid: str): payload = request.get_json(silent=True) or {} - response, status = service.rename(payload) - return jsonify(response), status + target_domain_value = payload.get("target_domain") + domain = service.parse_domain(target_domain_value) + if domain is None: + return jsonify({"error": "invalid target domain"}), 400 + error = service.require_mutation_for_domain(domain) + if error: + return jsonify(error[0]), error[1] + new_guid = payload.get("new_assembly_guid") + try: + record = service.runtime.clone_assembly( + assembly_guid, + target_domain=domain.value, + new_assembly_guid=new_guid, + ) + return jsonify(record), 201 + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + except Exception: # pragma: no cover + service.logger.exception("Failed to clone assembly %s.", assembly_id) + return jsonify({"error": "internal server error"}), 500 - @blueprint.route("/api/assembly/move", methods=["POST"]) - def _move(): + # ------------------------------------------------------------------ + # Dev Mode toggle + # ------------------------------------------------------------------ + @blueprint.route("/dev-mode/switch", methods=["POST"]) + def switch_dev_mode(): + error = service.require_admin() + if error: + return jsonify(error[0]), error[1] payload = request.get_json(silent=True) or {} - response, status = service.move(payload) - return jsonify(response), status + enabled = bool(payload.get("enabled")) + service.set_dev_mode(enabled) + return jsonify({"dev_mode": service._dev_mode_enabled()}), 200 - @blueprint.route("/api/assembly/delete", methods=["POST"]) - def _delete(): - payload = request.get_json(silent=True) or {} - response, status = service.delete(payload) - return jsonify(response), status + # ------------------------------------------------------------------ + # Immediate flush + # ------------------------------------------------------------------ + @blueprint.route("/dev-mode/write", methods=["POST"]) + def flush_assemblies(): + error = service.require_admin(dev_mode_required=True) + if error: + return jsonify(error[0]), error[1] + try: + service.runtime.flush_writes() + return jsonify({"status": "flushed"}), 200 + except Exception: # pragma: no cover + service.logger.exception("Failed to flush assembly queue.") + return jsonify({"error": "internal server error"}), 500 - @blueprint.route("/api/assembly/list", methods=["GET"]) - def _list(): - response, status = service.list_items(request.args.get("island", "")) - return jsonify(response), status - - @blueprint.route("/api/assembly/load", methods=["GET"]) - def _load(): - response, status = service.load(request.args.get("island", ""), request.args.get("path", "")) - return jsonify(response), status + # ------------------------------------------------------------------ + # Official sync + # ------------------------------------------------------------------ + @blueprint.route("/official/sync", methods=["POST"]) + def sync_official(): + error = service.require_admin(dev_mode_required=True) + if error: + return jsonify(error[0]), error[1] + try: + service.runtime.sync_official() + return jsonify({"status": "synced"}), 200 + except Exception: # pragma: no cover + service.logger.exception("Official assembly sync failed.") + return jsonify({"error": "internal server error"}), 500 app.register_blueprint(blueprint) - assemblies_execution.register_execution(app, adapters) - diff --git a/Data/Engine/services/assemblies/service.py b/Data/Engine/services/assemblies/service.py new file mode 100644 index 00000000..d69e98a4 --- /dev/null +++ b/Data/Engine/services/assemblies/service.py @@ -0,0 +1,332 @@ +# ====================================================== +# Data\Engine\services\assemblies\service.py +# Description: Provides assembly CRUD helpers backed by the AssemblyCache and SQLite persistence domains. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Runtime assembly management helpers for API routes.""" + +from __future__ import annotations + +import copy +import datetime as _dt +import hashlib +import json +import logging +import uuid +from typing import Any, Dict, List, Mapping, Optional + +from Data.Engine.assembly_management.bootstrap import AssemblyCache +from Data.Engine.assembly_management.models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadType +from Data.Engine.assembly_management.sync import sync_official_domain + + +class AssemblyRuntimeService: + """High-level assembly operations backed by :class:`AssemblyCache`.""" + + def __init__(self, cache: AssemblyCache, *, logger: Optional[logging.Logger] = None) -> None: + if cache is None: + raise RuntimeError("Assembly cache is not initialised; assemble the Engine runtime first.") + self._cache = cache + self._logger = logger or logging.getLogger(__name__) + + # ------------------------------------------------------------------ + # Query helpers + # ------------------------------------------------------------------ + def list_assemblies( + self, + *, + domain: Optional[str] = None, + kind: Optional[str] = None, + ) -> List[Dict[str, Any]]: + domain_filter = _coerce_domain(domain) if domain else None + entries = self._cache.list_entries(domain=domain_filter) + results: List[Dict[str, Any]] = [] + for entry in entries: + record = entry.record + if kind and record.assembly_kind.lower() != kind.lower(): + continue + results.append(self._serialize_entry(entry, include_payload=False)) + return results + + def get_assembly(self, assembly_guid: str) -> Optional[Dict[str, Any]]: + entry = self._cache.get_entry(assembly_guid) + if not entry: + return None + payload_text = self._read_payload_text(entry.record.assembly_guid) + data = self._serialize_entry(entry, include_payload=True, payload_text=payload_text) + return data + + def get_cached_entry(self, assembly_guid: str) -> Optional[CachedAssembly]: + return self._cache.get_entry(assembly_guid) + + def queue_snapshot(self) -> List[Dict[str, Any]]: + return self._cache.describe() + + # ------------------------------------------------------------------ + # Mutations + # ------------------------------------------------------------------ + def create_assembly(self, payload: Mapping[str, Any]) -> Dict[str, Any]: + assembly_guid = _coerce_guid(payload.get("assembly_guid")) + if not assembly_guid: + assembly_guid = uuid.uuid4().hex + if self._cache.get_entry(assembly_guid): + raise ValueError(f"Assembly '{assembly_guid}' already exists") + + domain = _expect_domain(payload.get("domain")) + record = self._build_record( + assembly_guid=assembly_guid, + domain=domain, + payload=payload, + existing=None, + ) + self._cache.stage_upsert(domain, record) + return self.get_assembly(assembly_guid) or {} + + def update_assembly(self, assembly_guid: str, payload: Mapping[str, Any]) -> Dict[str, Any]: + existing = self._cache.get_entry(assembly_guid) + if not existing: + raise ValueError(f"Assembly '{assembly_guid}' not found") + record = self._build_record( + assembly_guid=assembly_guid, + domain=existing.domain, + payload=payload, + existing=existing, + ) + self._cache.stage_upsert(existing.domain, record) + return self.get_assembly(assembly_guid) or {} + + def delete_assembly(self, assembly_guid: str) -> None: + entry = self._cache.get_entry(assembly_guid) + if not entry: + raise ValueError(f"Assembly '{assembly_guid}' not found") + self._cache.stage_delete(assembly_guid) + + def clone_assembly( + self, + assembly_guid: str, + *, + target_domain: str, + new_assembly_guid: Optional[str] = None, + ) -> Dict[str, Any]: + source_entry = self._cache.get_entry(assembly_guid) + if not source_entry: + raise ValueError(f"Assembly '{assembly_guid}' not found") + + domain = _expect_domain(target_domain) + clone_guid = _coerce_guid(new_assembly_guid) + if not clone_guid: + clone_guid = uuid.uuid4().hex + if self._cache.get_entry(clone_guid): + raise ValueError(f"Assembly '{clone_guid}' already exists; provide a unique identifier.") + + payload_text = self._read_payload_text(assembly_guid) + descriptor = self._cache.payload_manager.store_payload( + _payload_type_from_kind(source_entry.record.assembly_kind), + payload_text, + assembly_guid=clone_guid, + extension=".json", + ) + + now = _utcnow() + record = AssemblyRecord( + assembly_guid=clone_guid, + display_name=source_entry.record.display_name, + summary=source_entry.record.summary, + category=source_entry.record.category, + assembly_kind=source_entry.record.assembly_kind, + assembly_type=source_entry.record.assembly_type, + version=source_entry.record.version, + payload=descriptor, + metadata=copy.deepcopy(source_entry.record.metadata), + tags=copy.deepcopy(source_entry.record.tags), + checksum=hashlib.sha256(payload_text.encode("utf-8")).hexdigest(), + created_at=now, + updated_at=now, + ) + self._cache.stage_upsert(domain, record) + return self.get_assembly(clone_guid) or {} + + def flush_writes(self) -> None: + self._cache.flush_now() + + def sync_official(self) -> None: + db_manager = self._cache.database_manager + payload_manager = self._cache.payload_manager + staging_root = db_manager.staging_root + sync_official_domain(db_manager, payload_manager, staging_root, logger=self._logger) + self._cache.reload() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _build_record( + self, + *, + assembly_guid: str, + domain: AssemblyDomain, + payload: Mapping[str, Any], + existing: Optional[CachedAssembly], + ) -> AssemblyRecord: + now = _utcnow() + kind = str(payload.get("assembly_kind") or (existing.record.assembly_kind if existing else "") or "script").lower() + metadata = _coerce_dict(payload.get("metadata")) + display_name = payload.get("display_name") or metadata.get("display_name") + summary = payload.get("summary") or metadata.get("summary") + category = payload.get("category") or metadata.get("category") + assembly_type = payload.get("assembly_type") or metadata.get("assembly_type") + version = _coerce_int(payload.get("version"), default=existing.record.version if existing else 1) + tags = _coerce_dict(payload.get("tags") or (existing.record.tags if existing else {})) + + payload_content = payload.get("payload") + payload_text = _serialize_payload(payload_content) if payload_content is not None else None + + if existing: + metadata = _merge_metadata(existing.record.metadata, metadata) + if payload_text is None: + # Keep existing payload descriptor/content + descriptor = existing.record.payload + payload_text = self._read_payload_text(existing.record.assembly_guid) + else: + descriptor = self._cache.payload_manager.update_payload(existing.record.payload, payload_text) + else: + if payload_text is None: + raise ValueError("payload content required for new assemblies") + descriptor = self._cache.payload_manager.store_payload( + _payload_type_from_kind(kind), + payload_text, + assembly_guid=assembly_guid, + extension=".json", + ) + + checksum = hashlib.sha256(payload_text.encode("utf-8")).hexdigest() + record = AssemblyRecord( + assembly_guid=assembly_guid, + display_name=display_name or assembly_guid, + summary=summary, + category=category, + assembly_kind=kind, + assembly_type=assembly_type, + version=version, + payload=descriptor, + metadata=metadata, + tags=tags, + checksum=checksum, + created_at=existing.record.created_at if existing else now, + updated_at=now, + ) + return record + + def _serialize_entry( + self, + entry: CachedAssembly, + *, + include_payload: bool, + payload_text: Optional[str] = None, + ) -> Dict[str, Any]: + record = entry.record + data: Dict[str, Any] = { + "assembly_guid": record.assembly_guid, + "display_name": record.display_name, + "summary": record.summary, + "category": record.category, + "assembly_kind": record.assembly_kind, + "assembly_type": record.assembly_type, + "version": record.version, + "source": entry.domain.value, + "is_dirty": entry.is_dirty, + "dirty_since": entry.dirty_since.isoformat() if entry.dirty_since else None, + "last_persisted": entry.last_persisted.isoformat() if entry.last_persisted else None, + "payload_guid": record.payload.assembly_guid, + "created_at": record.created_at.isoformat(), + "updated_at": record.updated_at.isoformat(), + "metadata": copy.deepcopy(record.metadata), + "tags": copy.deepcopy(record.tags), + } + data.setdefault("assembly_id", record.assembly_guid) # legacy alias for older clients + if include_payload: + payload_text = payload_text if payload_text is not None else self._read_payload_text(record.assembly_guid) + data["payload"] = payload_text + try: + data["payload_json"] = json.loads(payload_text) + except Exception: + data["payload_json"] = None + return data + + def _read_payload_text(self, assembly_guid: str) -> str: + payload_bytes = self._cache.read_payload_bytes(assembly_guid) + try: + return payload_bytes.decode("utf-8") + except UnicodeDecodeError: + return payload_bytes.decode("utf-8", errors="replace") + + +def _coerce_domain(value: Any) -> Optional[AssemblyDomain]: + if value is None: + return None + text = str(value).strip().lower() + for domain in AssemblyDomain: + if domain.value == text: + return domain + return None + + +def _expect_domain(value: Any) -> AssemblyDomain: + domain = _coerce_domain(value) + if domain is None: + raise ValueError("invalid domain") + return domain + + +def _coerce_guid(value: Any) -> Optional[str]: + if value is None: + return None + text = str(value).strip() + if not text: + return None + return text.lower() + + +def _payload_type_from_kind(kind: str) -> PayloadType: + kind_lower = (kind or "").lower() + if kind_lower == "workflow": + return PayloadType.WORKFLOW + if kind_lower == "script": + return PayloadType.SCRIPT + if kind_lower == "ansible": + return PayloadType.BINARY + return PayloadType.UNKNOWN + + +def _serialize_payload(value: Any) -> str: + if isinstance(value, (dict, list)): + return json.dumps(value, indent=2, sort_keys=True) + if isinstance(value, str): + return value + raise ValueError("payload must be JSON object, array, or string") + + +def _coerce_dict(value: Any) -> Dict[str, Any]: + if isinstance(value, dict): + return copy.deepcopy(value) + return {} + + +def _merge_metadata(existing: Dict[str, Any], new_values: Dict[str, Any]) -> Dict[str, Any]: + combined = copy.deepcopy(existing or {}) + for key, val in (new_values or {}).items(): + combined[key] = val + return combined + + +def _coerce_int(value: Any, *, default: int = 0) -> int: + try: + return int(value) + except Exception: + return default + + +def _utcnow() -> _dt.datetime: + return _dt.datetime.utcnow().replace(microsecond=0) +