diff --git a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md index abaa5103..8ece9020 100644 --- a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md +++ b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md @@ -9,15 +9,15 @@ ## 1. Implement multi-database assembly persistence with payload indirection -[ ] Define three SQLite databases (`official.db`, `community.db`, `user_created.db`) stored under `Data/Engine/Assemblies/` and mirrored to `/Engine/Assemblies/` at runtime. -[ ] Standardize a shared schema (per your column list) and enable WAL + shared-cache on all connections. -[ ] Add payload GUID indirection so large scripts/workflows/binaries live under `Data/Engine/Assemblies/Payloads/` and `/Engine/Assemblies/Payloads/`; the DB stores GUID references, not raw base64. -[ ] Build a startup loader that opens each database, validates schema, and hydrates an in-memory cache keyed by `assembly_id` with metadata, payload handles, and source domain. -[ ] Implement a write queue service that stages mutations in memory, marks cache entries as “dirty,” persists them on a 60-second cadence (configurable), and handles graceful shutdown flushing. -[ ] Expose cache state so callers can detect queued writes vs. persisted rows. +[x] Define three SQLite databases (`official.db`, `community.db`, `user_created.db`) stored under `Data/Engine/Assemblies/` and mirrored to `/Engine/Assemblies/` at runtime. +[x] Standardize a shared schema (per your column list) and enable WAL + shared-cache on all connections. +[x] Add payload GUID indirection so large scripts/workflows/binaries live under `Data/Engine/Assemblies/Payloads/` and `/Engine/Assemblies/Payloads/`; the DB stores GUID references, not raw base64. +[x] Build a startup loader that opens each database, validates schema, and hydrates an in-memory cache keyed by `assembly_id` with metadata, payload handles, and source domain. +[x] Implement a write queue service that stages mutations in memory, marks cache entries as “dirty,” persists them on a 60-second cadence (configurable), and handles graceful shutdown flushing. +[x] Expose cache state so callers can detect queued writes vs. persisted rows. ### Details ``` -1. Under `Data/Engine/engine/assemblies/`, add a new package (e.g., `persistence/`) containing: +1. Under `Data/Engine/assembly_management/`, add the management package containing: * `databases.py` for connection management (WAL, pragmas, attach logic). * `models.py` defining dataclasses for assemblies/payload metadata. @@ -40,6 +40,13 @@ 5. Include integrity checks/logging: detect missing payload files, log warnings, and surface errors to API callers. ``` +**Stage Notes** +- Added `Data/Engine/assembly_management/` with `databases.py`, `models.py`, `payloads.py`, and `bootstrap.py` to manage multi-domain SQLite storage, payload GUID indirection, and the timed write queue. +- `AssemblyDatabaseManager.initialise()` now creates `official.db`, `community.db`, and `user_created.db` in the staging tree with WAL/shared-cache pragmas and mirrors them to `/Engine/Assemblies/`. +- `PayloadManager` persists payload content beneath `Payloads/` in both staging and runtime directories, computing SHA-256 checksums for metadata. +- `AssemblyCache` hydrates all domains at startup, exposes `describe()` for dirty state inspection, and flushes staged writes on a configurable cadence (default 60 s) with an atexit shutdown hook. +- `initialise_assembly_runtime()` is invoked from both `create_app` and `register_engine_api`, wiring the cache onto `EngineContext` and ensuring graceful shutdown flushing. + ## 2. Update Engine services and APIs for multi-domain assemblies [ ] Refactor existing assembly REST endpoints to read from the cache instead of filesystem JSON. [ ] Add source metadata (`official`, `community`, `user`) to API responses. @@ -132,4 +139,4 @@ * Database layout. * Dev Mode usage instructions. * Backup guidance (even if future work, note current expectations). -``` \ No newline at end of file +``` diff --git a/Data/Engine/assembly_management/__init__.py b/Data/Engine/assembly_management/__init__.py new file mode 100644 index 00000000..22adbc37 --- /dev/null +++ b/Data/Engine/assembly_management/__init__.py @@ -0,0 +1,27 @@ +# ====================================================== +# Data\Engine\assembly_management\__init__.py +# Description: Exposes assembly persistence helpers for the Borealis Engine. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Assembly persistence package for the Borealis Engine.""" + +from __future__ import annotations + +from .bootstrap import AssemblyCache, initialise_assembly_runtime +from .databases import AssemblyDatabaseManager +from .models import AssemblyDomain, AssemblyRecord, PayloadDescriptor, PayloadType +from .payloads import PayloadManager + +__all__ = [ + "AssemblyCache", + "AssemblyDatabaseManager", + "AssemblyDomain", + "AssemblyRecord", + "PayloadDescriptor", + "PayloadType", + "PayloadManager", + "initialise_assembly_runtime", +] + diff --git a/Data/Engine/assembly_management/bootstrap.py b/Data/Engine/assembly_management/bootstrap.py new file mode 100644 index 00000000..1f415c96 --- /dev/null +++ b/Data/Engine/assembly_management/bootstrap.py @@ -0,0 +1,278 @@ +# ====================================================== +# Data\Engine\assembly_management\bootstrap.py +# Description: Startup helpers that initialise assembly databases, payload storage, and cache services. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Assembly runtime bootstrap logic.""" + +from __future__ import annotations + +import logging +import threading +from pathlib import Path +from typing import Dict, List, Mapping, Optional + +from .databases import AssemblyDatabaseManager +from .models import AssemblyDomain, AssemblyRecord, CachedAssembly +from .payloads import PayloadManager + + +class AssemblyCache: + """Caches assemblies in memory and coordinates background persistence.""" + + _singleton: Optional["AssemblyCache"] = None + _singleton_lock = threading.Lock() + + @classmethod + def initialise( + cls, + database_manager: AssemblyDatabaseManager, + payload_manager: PayloadManager, + *, + flush_interval_seconds: float = 60.0, + logger: Optional[logging.Logger] = None, + ) -> "AssemblyCache": + with cls._singleton_lock: + if cls._singleton is None: + cls._singleton = cls( + database_manager=database_manager, + payload_manager=payload_manager, + flush_interval_seconds=flush_interval_seconds, + logger=logger, + ) + return cls._singleton + + @classmethod + def get(cls) -> Optional["AssemblyCache"]: + with cls._singleton_lock: + return cls._singleton + + def __init__( + self, + *, + database_manager: AssemblyDatabaseManager, + payload_manager: PayloadManager, + flush_interval_seconds: float, + logger: Optional[logging.Logger], + ) -> None: + self._db_manager = database_manager + self._payload_manager = payload_manager + self._flush_interval = max(5.0, float(flush_interval_seconds)) + self._logger = logger or logging.getLogger(__name__) + self._store: Dict[str, CachedAssembly] = {} + self._dirty: Dict[str, CachedAssembly] = {} + self._pending_deletes: Dict[str, CachedAssembly] = {} + self._domain_index: Dict[AssemblyDomain, Dict[str, CachedAssembly]] = { + domain: {} for domain in AssemblyDomain + } + self._lock = threading.RLock() + self._stop_event = threading.Event() + self._flush_event = threading.Event() + self.reload() + self._worker = threading.Thread(target=self._worker_loop, name="AssemblyCacheFlush", daemon=True) + self._worker.start() + + # ------------------------------------------------------------------ + # Cache interactions + # ------------------------------------------------------------------ + def reload(self) -> None: + """Hydrate cache from persistence.""" + + with self._lock: + self._store.clear() + self._dirty.clear() + self._pending_deletes.clear() + for domain in AssemblyDomain: + self._domain_index[domain].clear() + records = self._db_manager.load_all(domain) + for record in records: + self._payload_manager.ensure_runtime_copy(record.payload) + entry = CachedAssembly(domain=domain, record=record, is_dirty=False, last_persisted=record.updated_at) + self._store[record.assembly_id] = entry + self._domain_index[domain][record.assembly_id] = entry + + def get(self, assembly_id: str) -> Optional[AssemblyRecord]: + with self._lock: + entry = self._store.get(assembly_id) + if not entry: + return None + return entry.record + + def list_records(self, *, domain: Optional[AssemblyDomain] = None) -> List[AssemblyRecord]: + with self._lock: + if domain is None: + return [entry.record for entry in self._store.values()] + return [entry.record for entry in self._domain_index[domain].values()] + + def stage_upsert(self, domain: AssemblyDomain, record: AssemblyRecord) -> None: + with self._lock: + entry = self._store.get(record.assembly_id) + if entry is None: + entry = CachedAssembly(domain=domain, record=record, is_dirty=True) + entry.mark_dirty() + self._store[record.assembly_id] = entry + self._domain_index[domain][record.assembly_id] = entry + else: + entry.domain = domain + entry.record = record + entry.mark_dirty() + self._pending_deletes.pop(record.assembly_id, None) + self._dirty[record.assembly_id] = entry + self._flush_event.set() + + def stage_delete(self, assembly_id: str) -> None: + with self._lock: + entry = self._store.get(assembly_id) + if not entry: + return + entry.is_dirty = True + self._dirty.pop(assembly_id, None) + self._pending_deletes[assembly_id] = entry + self._flush_event.set() + + def describe(self) -> List[Dict[str, str]]: + with self._lock: + snapshot = [] + for assembly_id, entry in self._store.items(): + snapshot.append( + { + "assembly_id": assembly_id, + "domain": entry.domain.value, + "is_dirty": "true" if entry.is_dirty else "false", + "dirty_since": entry.dirty_since.isoformat() if entry.dirty_since else "", + "last_persisted": entry.last_persisted.isoformat() if entry.last_persisted else "", + } + ) + return snapshot + + def flush_now(self) -> None: + self._flush_dirty_entries() + + def shutdown(self, *, flush: bool = True) -> None: + self._stop_event.set() + self._flush_event.set() + if self._worker.is_alive(): + self._worker.join(timeout=10.0) + if flush: + self._flush_dirty_entries() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _worker_loop(self) -> None: + while not self._stop_event.is_set(): + triggered = self._flush_event.wait(timeout=self._flush_interval) + if self._stop_event.is_set(): + break + if triggered: + self._flush_event.clear() + self._flush_dirty_entries() + + def _flush_dirty_entries(self) -> None: + dirty_items: List[CachedAssembly] + delete_items: List[CachedAssembly] + with self._lock: + if not self._dirty and not self._pending_deletes: + return + dirty_items = list(self._dirty.values()) + delete_items = list(self._pending_deletes.values()) + self._dirty.clear() + self._pending_deletes.clear() + + for entry in delete_items: + try: + self._db_manager.delete_record(entry.domain, entry) + self._payload_manager.delete_payload(entry.record.payload) + with self._lock: + self._store.pop(entry.record.assembly_id, None) + self._domain_index[entry.domain].pop(entry.record.assembly_id, None) + except Exception as exc: + self._logger.error( + "Failed to delete assembly %s in domain %s: %s", + entry.record.assembly_id, + entry.domain.value, + exc, + ) + with self._lock: + self._pending_deletes[entry.record.assembly_id] = entry + return + + for entry in dirty_items: + try: + self._db_manager.upsert_record(entry.domain, entry) + self._payload_manager.ensure_runtime_copy(entry.record.payload) + entry.mark_clean() + except Exception as exc: + self._logger.error( + "Failed to flush assembly %s in domain %s: %s", + entry.record.assembly_id, + entry.domain.value, + exc, + ) + with self._lock: + self._dirty[entry.record.assembly_id] = entry + break + + +def initialise_assembly_runtime( + *, + logger: Optional[logging.Logger] = None, + config: Optional[Mapping[str, object]] = None, +) -> AssemblyCache: + """Initialise assembly persistence subsystems and return the cache instance.""" + + staging_root = _discover_staging_root() + runtime_root = _discover_runtime_root() + payload_staging = staging_root / "Payloads" + payload_runtime = runtime_root / "Payloads" + + db_manager = AssemblyDatabaseManager(staging_root=staging_root, runtime_root=runtime_root, logger=logger) + db_manager.initialise() + + payload_manager = PayloadManager(staging_root=payload_staging, runtime_root=payload_runtime, logger=logger) + flush_interval = _resolve_flush_interval(config) + + return AssemblyCache.initialise( + database_manager=db_manager, + payload_manager=payload_manager, + flush_interval_seconds=flush_interval, + logger=logger, + ) + + +# ---------------------------------------------------------------------- +# Helper utilities +# ---------------------------------------------------------------------- +def _resolve_flush_interval(config: Optional[Mapping[str, object]]) -> float: + if not config: + return 60.0 + for key in ("assemblies_flush_interval", "ASSEMBLIES_FLUSH_INTERVAL"): + if key in config: + value = config[key] + try: + return max(5.0, float(value)) + except (TypeError, ValueError): + continue + return 60.0 + + +def _discover_runtime_root() -> Path: + module_path = Path(__file__).resolve() + for candidate in (module_path, *module_path.parents): + engine_dir = candidate / "Engine" + assemblies_dir = engine_dir / "Assemblies" + if assemblies_dir.is_dir(): + return assemblies_dir.resolve() + raise RuntimeError("Could not locate runtime assemblies directory (expected Engine/Assemblies).") + + +def _discover_staging_root() -> Path: + module_path = Path(__file__).resolve() + for candidate in (module_path, *module_path.parents): + data_dir = candidate / "Data" / "Engine" / "Assemblies" + if data_dir.is_dir(): + return data_dir.resolve() + raise RuntimeError("Could not locate staging assemblies directory (expected Data/Engine/Assemblies).") + diff --git a/Data/Engine/assembly_management/databases.py b/Data/Engine/assembly_management/databases.py new file mode 100644 index 00000000..1a0d1d77 --- /dev/null +++ b/Data/Engine/assembly_management/databases.py @@ -0,0 +1,329 @@ +# ====================================================== +# Data\Engine\assembly_management\databases.py +# Description: Manages assembly SQLite databases with WAL/shared-cache configuration and schema validation. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""SQLite persistence helpers for Engine assemblies.""" + +from __future__ import annotations + +import datetime as _dt +import json +import logging +import shutil +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Optional + +from .models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadDescriptor, PayloadType + + +_SCHEMA_STATEMENTS: Iterable[str] = ( + """ + CREATE TABLE IF NOT EXISTS payloads ( + payload_guid TEXT PRIMARY KEY, + payload_type TEXT NOT NULL, + file_name TEXT NOT NULL, + file_extension TEXT NOT NULL, + size_bytes INTEGER NOT NULL DEFAULT 0, + checksum TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """, + """ + CREATE TABLE IF NOT EXISTS assemblies ( + assembly_id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + summary TEXT, + category TEXT, + assembly_kind TEXT NOT NULL, + assembly_type TEXT, + version INTEGER NOT NULL DEFAULT 1, + payload_guid TEXT NOT NULL, + metadata_json TEXT, + tags_json TEXT, + checksum TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(payload_guid) REFERENCES payloads(payload_guid) ON DELETE CASCADE + ) + """, + "CREATE INDEX IF NOT EXISTS idx_assemblies_kind ON assemblies(assembly_kind)", + "CREATE INDEX IF NOT EXISTS idx_assemblies_category ON assemblies(category)", +) + + +def _parse_datetime(value: str) -> _dt.datetime: + try: + return _dt.datetime.fromisoformat(value) + except Exception: + return _dt.datetime.utcnow() + + +@dataclass(slots=True) +class AssemblyDatabasePaths: + """Resolved paths for staging and runtime copies of an assembly database.""" + + staging: Path + runtime: Path + + +class AssemblyDatabaseManager: + """Coordinates SQLite database access for assembly persistence.""" + + def __init__(self, staging_root: Path, runtime_root: Path, *, logger: Optional[logging.Logger] = None) -> None: + self._staging_root = staging_root + self._runtime_root = runtime_root + self._logger = logger or logging.getLogger(__name__) + self._paths: Dict[AssemblyDomain, AssemblyDatabasePaths] = {} + self._staging_root.mkdir(parents=True, exist_ok=True) + self._runtime_root.mkdir(parents=True, exist_ok=True) + for domain in AssemblyDomain: + staging = (self._staging_root / domain.database_name).resolve() + runtime = (self._runtime_root / domain.database_name).resolve() + self._paths[domain] = AssemblyDatabasePaths(staging=staging, runtime=runtime) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def initialise(self) -> None: + """Ensure all databases exist, apply schema, and mirror to the runtime directory.""" + + for domain in AssemblyDomain: + conn = self._open_connection(domain) + try: + self._apply_schema(conn) + conn.commit() + finally: + conn.close() + self._mirror_database(domain) + + def load_all(self, domain: AssemblyDomain) -> List[AssemblyRecord]: + """Load all assembly records for the given domain.""" + + conn = self._open_connection(domain, readonly=True) + try: + cur = conn.cursor() + cur.execute( + """ + SELECT + a.assembly_id AS assembly_id, + a.display_name AS display_name, + a.summary AS summary, + a.category AS category, + a.assembly_kind AS assembly_kind, + a.assembly_type AS assembly_type, + a.version AS version, + a.payload_guid AS payload_guid, + a.metadata_json AS metadata_json, + a.tags_json AS tags_json, + a.checksum AS assembly_checksum, + a.created_at AS assembly_created_at, + a.updated_at AS assembly_updated_at, + p.payload_guid AS payload_guid, + p.payload_type AS payload_type, + p.file_name AS payload_file_name, + p.file_extension AS payload_file_extension, + p.size_bytes AS payload_size_bytes, + p.checksum AS payload_checksum, + p.created_at AS payload_created_at, + p.updated_at AS payload_updated_at + FROM assemblies AS a + JOIN payloads AS p ON p.payload_guid = a.payload_guid + """ + ) + records: List[AssemblyRecord] = [] + for row in cur.fetchall(): + payload_type_raw = row["payload_type"] + try: + payload_type = PayloadType(payload_type_raw) + except Exception: + payload_type = PayloadType.UNKNOWN + payload = PayloadDescriptor( + guid=row["payload_guid"], + payload_type=payload_type, + file_name=row["payload_file_name"], + file_extension=row["payload_file_extension"], + size_bytes=row["payload_size_bytes"], + checksum=row["payload_checksum"], + created_at=_parse_datetime(row["payload_created_at"]), + updated_at=_parse_datetime(row["payload_updated_at"]), + ) + metadata_json = row["metadata_json"] or "{}" + tags_json = row["tags_json"] or "{}" + try: + metadata = json.loads(metadata_json) + except Exception: + metadata = {} + try: + tags = json.loads(tags_json) + except Exception: + tags = {} + record = AssemblyRecord( + assembly_id=row["assembly_id"], + display_name=row["display_name"], + summary=row["summary"], + category=row["category"], + assembly_kind=row["assembly_kind"], + assembly_type=row["assembly_type"], + version=row["version"], + payload=payload, + metadata=metadata, + tags=tags, + checksum=row["assembly_checksum"], + created_at=_parse_datetime(row["assembly_created_at"]), + updated_at=_parse_datetime(row["assembly_updated_at"]), + ) + records.append(record) + return records + finally: + conn.close() + + def upsert_record(self, domain: AssemblyDomain, entry: CachedAssembly) -> None: + """Insert or update an assembly record and its payload metadata.""" + + record = entry.record + conn = self._open_connection(domain) + try: + cur = conn.cursor() + payload = record.payload + cur.execute( + """ + INSERT INTO payloads (payload_guid, payload_type, file_name, file_extension, size_bytes, checksum, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(payload_guid) DO UPDATE SET + payload_type = excluded.payload_type, + file_name = excluded.file_name, + file_extension = excluded.file_extension, + size_bytes = excluded.size_bytes, + checksum = excluded.checksum, + updated_at = excluded.updated_at + """, + ( + payload.guid, + payload.payload_type.value, + payload.file_name, + payload.file_extension, + payload.size_bytes, + payload.checksum, + payload.created_at.isoformat(), + payload.updated_at.isoformat(), + ), + ) + metadata_json = json.dumps(record.metadata or {}) + tags_json = json.dumps(record.tags or {}) + cur.execute( + """ + INSERT INTO assemblies ( + assembly_id, + display_name, + summary, + category, + assembly_kind, + assembly_type, + version, + payload_guid, + metadata_json, + tags_json, + checksum, + created_at, + updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(assembly_id) DO UPDATE SET + display_name = excluded.display_name, + summary = excluded.summary, + category = excluded.category, + assembly_kind = excluded.assembly_kind, + assembly_type = excluded.assembly_type, + version = excluded.version, + payload_guid = excluded.payload_guid, + metadata_json = excluded.metadata_json, + tags_json = excluded.tags_json, + checksum = excluded.checksum, + updated_at = excluded.updated_at + """, + ( + record.assembly_id, + record.display_name, + record.summary, + record.category, + record.assembly_kind, + record.assembly_type, + record.version, + payload.guid, + metadata_json, + tags_json, + record.checksum, + record.created_at.isoformat(), + record.updated_at.isoformat(), + ), + ) + conn.commit() + finally: + conn.close() + self._mirror_database(domain) + + def delete_record(self, domain: AssemblyDomain, entry: CachedAssembly) -> None: + """Delete an assembly and its payload metadata.""" + + record = entry.record + conn = self._open_connection(domain) + try: + cur = conn.cursor() + cur.execute("DELETE FROM assemblies WHERE assembly_id = ?", (record.assembly_id,)) + conn.commit() + finally: + conn.close() + self._mirror_database(domain) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _open_connection(self, domain: AssemblyDomain, *, readonly: bool = False) -> sqlite3.Connection: + paths = self._paths[domain] + flags = "ro" if readonly else "rwc" + uri = f"file:{paths.staging.as_posix()}?mode={flags}&cache=shared" + conn = sqlite3.connect(uri, uri=True, timeout=30) + if readonly: + conn.isolation_level = None + conn.row_factory = sqlite3.Row + cur = conn.cursor() + cur.execute("PRAGMA journal_mode=WAL") + cur.execute("PRAGMA synchronous=NORMAL") + cur.execute("PRAGMA foreign_keys=ON") + cur.execute("PRAGMA busy_timeout=5000") + cur.execute("PRAGMA cache_size=-8000") + cur.execute("PRAGMA temp_store=MEMORY") + return conn + + def _apply_schema(self, conn: sqlite3.Connection) -> None: + cur = conn.cursor() + for statement in _SCHEMA_STATEMENTS: + cur.execute(statement) + conn.commit() + + def _mirror_database(self, domain: AssemblyDomain) -> None: + paths = self._paths[domain] + staging_db = paths.staging + runtime_db = paths.runtime + runtime_db.parent.mkdir(parents=True, exist_ok=True) + + for suffix in ("", "-wal", "-shm"): + staging_candidate = staging_db.parent / f"{staging_db.name}{suffix}" + runtime_candidate = runtime_db.parent / f"{runtime_db.name}{suffix}" + if staging_candidate.exists(): + try: + shutil.copy2(staging_candidate, runtime_candidate) + except Exception as exc: # pragma: no cover - best effort mirror + self._logger.debug( + "Failed to mirror assembly database file %s -> %s: %s", + staging_candidate, + runtime_candidate, + exc, + ) + diff --git a/Data/Engine/assembly_management/models.py b/Data/Engine/assembly_management/models.py new file mode 100644 index 00000000..467a16e1 --- /dev/null +++ b/Data/Engine/assembly_management/models.py @@ -0,0 +1,109 @@ +# ====================================================== +# Data\Engine\assembly_management\models.py +# Description: Dataclasses describing assemblies, payload descriptors, and cache state. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Model definitions for assembly persistence.""" + +from __future__ import annotations + +import datetime as _dt +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, Optional + + +class AssemblyDomain(str, Enum): + """Logical source domains mapped to dedicated SQLite databases.""" + + OFFICIAL = "official" + COMMUNITY = "community" + USER = "user" + + @property + def database_name(self) -> str: + mapping = { + AssemblyDomain.OFFICIAL: "official.db", + AssemblyDomain.COMMUNITY: "community.db", + AssemblyDomain.USER: "user_created.db", + } + return mapping[self] + + +class PayloadType(str, Enum): + """Supported payload classifications.""" + + SCRIPT = "script" + WORKFLOW = "workflow" + BINARY = "binary" + UNKNOWN = "unknown" + + +@dataclass(slots=True) +class PayloadDescriptor: + """Represents on-disk payload material referenced by an assembly.""" + + guid: str + payload_type: PayloadType + file_name: str + file_extension: str + size_bytes: int + checksum: Optional[str] + created_at: _dt.datetime + updated_at: _dt.datetime + + def as_dict(self) -> Dict[str, Any]: + return { + "guid": self.guid, + "payload_type": self.payload_type.value, + "file_name": self.file_name, + "file_extension": self.file_extension, + "size_bytes": self.size_bytes, + "checksum": self.checksum, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } + + +@dataclass(slots=True) +class AssemblyRecord: + """Represents an assembly row hydrated from persistence.""" + + assembly_id: str + display_name: str + summary: Optional[str] + category: Optional[str] + assembly_kind: str + assembly_type: Optional[str] + version: int + payload: PayloadDescriptor + metadata: Dict[str, Any] = field(default_factory=dict) + tags: Dict[str, Any] = field(default_factory=dict) + checksum: Optional[str] = None + created_at: _dt.datetime = field(default_factory=_dt.datetime.utcnow) + updated_at: _dt.datetime = field(default_factory=_dt.datetime.utcnow) + + +@dataclass(slots=True) +class CachedAssembly: + """Wrapper stored in memory with dirty state tracking.""" + + domain: AssemblyDomain + record: AssemblyRecord + is_dirty: bool = False + last_persisted: Optional[_dt.datetime] = None + dirty_since: Optional[_dt.datetime] = None + + def mark_dirty(self) -> None: + now = _dt.datetime.utcnow() + self.is_dirty = True + self.dirty_since = self.dirty_since or now + + def mark_clean(self) -> None: + now = _dt.datetime.utcnow() + self.is_dirty = False + self.last_persisted = now + self.dirty_since = None + diff --git a/Data/Engine/assembly_management/payloads.py b/Data/Engine/assembly_management/payloads.py new file mode 100644 index 00000000..51ad81d4 --- /dev/null +++ b/Data/Engine/assembly_management/payloads.py @@ -0,0 +1,166 @@ +# ====================================================== +# Data\Engine\assembly_management\payloads.py +# Description: Handles payload GUID generation, filesystem storage, and staging/runtime mirroring. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Payload storage helpers for assembly persistence.""" + +from __future__ import annotations + +import datetime as _dt +import hashlib +import logging +import shutil +import uuid +from pathlib import Path +from typing import Optional, Union + +from .models import PayloadDescriptor, PayloadType + + +class PayloadManager: + """Stores payload content on disk and mirrors it to the runtime directory.""" + + def __init__(self, staging_root: Path, runtime_root: Path, *, logger: Optional[logging.Logger] = None) -> None: + self._staging_root = staging_root + self._runtime_root = runtime_root + self._logger = logger or logging.getLogger(__name__) + self._staging_root.mkdir(parents=True, exist_ok=True) + self._runtime_root.mkdir(parents=True, exist_ok=True) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def store_payload( + self, + payload_type: PayloadType, + content: Union[str, bytes], + *, + guid: Optional[str] = None, + extension: Optional[str] = None, + ) -> PayloadDescriptor: + """Persist payload content and mirror it to the runtime directory.""" + + resolved_guid = self._normalise_guid(guid or uuid.uuid4().hex) + resolved_extension = self._normalise_extension(extension or self._default_extension(payload_type)) + now = _dt.datetime.utcnow() + data = content.encode("utf-8") if isinstance(content, str) else bytes(content) + checksum = hashlib.sha256(data).hexdigest() + + staging_dir = self._payload_dir(self._staging_root, resolved_guid) + runtime_dir = self._payload_dir(self._runtime_root, resolved_guid) + staging_dir.mkdir(parents=True, exist_ok=True) + runtime_dir.mkdir(parents=True, exist_ok=True) + + file_name = f"payload{resolved_extension}" + staging_path = staging_dir / file_name + runtime_path = runtime_dir / file_name + + with staging_path.open("wb") as handle: + handle.write(data) + + try: + shutil.copy2(staging_path, runtime_path) + except Exception as exc: # pragma: no cover - best effort mirror + self._logger.debug("Failed to mirror payload %s to runtime copy: %s", resolved_guid, exc) + + descriptor = PayloadDescriptor( + guid=resolved_guid, + payload_type=payload_type, + file_name=file_name, + file_extension=resolved_extension, + size_bytes=len(data), + checksum=checksum, + created_at=now, + updated_at=now, + ) + return descriptor + + def update_payload(self, descriptor: PayloadDescriptor, content: Union[str, bytes]) -> PayloadDescriptor: + """Replace payload content while retaining GUID and metadata.""" + + data = content.encode("utf-8") if isinstance(content, str) else bytes(content) + checksum = hashlib.sha256(data).hexdigest() + now = _dt.datetime.utcnow() + + staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name + runtime_path = self._payload_dir(self._runtime_root, descriptor.guid) / descriptor.file_name + staging_path.parent.mkdir(parents=True, exist_ok=True) + runtime_path.parent.mkdir(parents=True, exist_ok=True) + + with staging_path.open("wb") as handle: + handle.write(data) + + try: + shutil.copy2(staging_path, runtime_path) + except Exception as exc: # pragma: no cover - best effort mirror + self._logger.debug("Failed to mirror payload %s during update: %s", descriptor.guid, exc) + + descriptor.size_bytes = len(data) + descriptor.checksum = checksum + descriptor.updated_at = now + return descriptor + + def read_payload_bytes(self, descriptor: PayloadDescriptor) -> bytes: + """Retrieve payload content from the staging copy.""" + + staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name + return staging_path.read_bytes() + + def ensure_runtime_copy(self, descriptor: PayloadDescriptor) -> None: + """Ensure the runtime payload copy matches the staging content.""" + + staging_path = self._payload_dir(self._staging_root, descriptor.guid) / descriptor.file_name + runtime_path = self._payload_dir(self._runtime_root, descriptor.guid) / descriptor.file_name + if not staging_path.exists(): + self._logger.warning("Payload missing on disk; guid=%s path=%s", descriptor.guid, staging_path) + return + runtime_path.parent.mkdir(parents=True, exist_ok=True) + try: + shutil.copy2(staging_path, runtime_path) + except Exception as exc: # pragma: no cover - best effort mirror + self._logger.debug("Failed to mirror payload %s via ensure_runtime_copy: %s", descriptor.guid, exc) + + def delete_payload(self, descriptor: PayloadDescriptor) -> None: + """Remove staging and runtime payload files.""" + + for root in (self._staging_root, self._runtime_root): + dir_path = self._payload_dir(root, descriptor.guid) + file_path = dir_path / descriptor.file_name + try: + if file_path.exists(): + file_path.unlink() + if dir_path.exists() and not any(dir_path.iterdir()): + dir_path.rmdir() + except Exception as exc: # pragma: no cover - best effort cleanup + self._logger.debug("Failed to remove payload directory %s (%s): %s", descriptor.guid, root, exc) + + # ------------------------------------------------------------------ + # Helper methods + # ------------------------------------------------------------------ + def _payload_dir(self, root: Path, guid: str) -> Path: + return root / guid.lower().strip() + + @staticmethod + def _default_extension(payload_type: PayloadType) -> str: + if payload_type == PayloadType.SCRIPT: + return ".txt" + if payload_type == PayloadType.WORKFLOW: + return ".json" + return ".bin" + + @staticmethod + def _normalise_extension(extension: str) -> str: + value = (extension or "").strip() + if not value: + return ".bin" + if not value.startswith("."): + return f".{value}" + return value + + @staticmethod + def _normalise_guid(guid: str) -> str: + return guid.strip().lower() + diff --git a/Data/Engine/server.py b/Data/Engine/server.py index d1147f84..03e4447c 100644 --- a/Data/Engine/server.py +++ b/Data/Engine/server.py @@ -16,6 +16,7 @@ error log) to align with the project's operational practices. """ from __future__ import annotations +import atexit import importlib.util import logging import time @@ -96,8 +97,11 @@ if HttpProtocol is not None: # pragma: no branch - attribute exists in supporte _SOCKETIO_ASYNC_MODE = "eventlet" +_ASSEMBLY_SHUTDOWN_REGISTERED = False + from .config import EngineSettings, initialise_engine_logger, load_runtime_config +from .assembly_management import initialise_assembly_runtime @dataclass @@ -114,6 +118,7 @@ class EngineContext: config: Mapping[str, Any] api_groups: Sequence[str] api_log_path: str + assembly_cache: Optional[Any] = None __all__ = ["EngineContext", "create_app", "register_engine_api"] @@ -131,6 +136,7 @@ def _build_engine_context(settings: EngineSettings, logger: logging.Logger) -> E config=settings.as_dict(), api_groups=settings.api_groups, api_log_path=settings.api_log_file, + assembly_cache=None, ) @@ -160,6 +166,21 @@ def _attach_transition_logging(app: Flask, context: EngineContext, logger: loggi setattr(app, "_engine_api_logging_installed", True) +def _register_assembly_shutdown_hook(assembly_cache, logger: logging.Logger) -> None: + global _ASSEMBLY_SHUTDOWN_REGISTERED + if _ASSEMBLY_SHUTDOWN_REGISTERED: + return + + def _shutdown_assembly_cache() -> None: # pragma: no cover - process shutdown + try: + assembly_cache.shutdown(flush=True) + except Exception: + logger.debug("Failed to shut down assembly cache cleanly", exc_info=True) + + atexit.register(_shutdown_assembly_cache) + _ASSEMBLY_SHUTDOWN_REGISTERED = True + + def create_app(config: Optional[Mapping[str, Any]] = None) -> Tuple[Flask, SocketIO, EngineContext]: """Create the Stage 2 Engine Flask application.""" @@ -193,6 +214,11 @@ def create_app(config: Optional[Mapping[str, Any]] = None) -> Tuple[Flask, Socke context = _build_engine_context(settings, logger) context.socketio = socketio + assembly_cache = initialise_assembly_runtime(logger=logger, config=settings.as_dict()) + assembly_cache.reload() + context.assembly_cache = assembly_cache + _register_assembly_shutdown_hook(assembly_cache, logger) + api_logger = logging.getLogger("borealis.engine.api") if not api_logger.handlers: api_handler = TimedRotatingFileHandler( @@ -250,6 +276,11 @@ def register_engine_api(app: Flask, *, config: Optional[Mapping[str, Any]] = Non logger = initialise_engine_logger(settings) context = _build_engine_context(settings, logger) + assembly_cache = initialise_assembly_runtime(logger=logger, config=settings.as_dict()) + assembly_cache.reload() + context.assembly_cache = assembly_cache + _register_assembly_shutdown_hook(assembly_cache, logger) + from .services import API # Local import avoids circular dependency at module import time API.register_api(app, context)