diff --git a/Data/Engine/bootstrapper.py b/Data/Engine/bootstrapper.py index f16ce756..3a5e9248 100644 --- a/Data/Engine/bootstrapper.py +++ b/Data/Engine/bootstrapper.py @@ -51,7 +51,7 @@ def _build_runtime_config() -> Dict[str, Any]: if api_groups_override: api_groups: Any = api_groups_override else: - api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies") + api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs") return { "HOST": os.environ.get("BOREALIS_ENGINE_HOST", DEFAULT_HOST), diff --git a/Data/Engine/server.py b/Data/Engine/server.py index d895492f..9823f6f0 100644 --- a/Data/Engine/server.py +++ b/Data/Engine/server.py @@ -124,6 +124,7 @@ class EngineContext: database_path: str logger: logging.Logger scheduler: Any + socketio: Optional[Any] tls_cert_path: Optional[str] tls_key_path: Optional[str] tls_bundle_path: Optional[str] @@ -140,6 +141,7 @@ def _build_engine_context(settings: EngineSettings, logger: logging.Logger) -> E database_path=settings.database_path, logger=logger, scheduler=None, + socketio=None, tls_cert_path=settings.tls_cert_path, tls_key_path=settings.tls_key_path, tls_bundle_path=settings.tls_bundle_path, @@ -206,6 +208,7 @@ def create_app(config: Optional[Mapping[str, Any]] = None) -> Tuple[Flask, Socke ) context = _build_engine_context(settings, logger) + context.socketio = socketio api_logger = logging.getLogger("borealis.engine.api") if not api_logger.handlers: diff --git a/Data/Engine/services/API/__init__.py b/Data/Engine/services/API/__init__.py index 3692d991..459d435c 100644 --- a/Data/Engine/services/API/__init__.py +++ b/Data/Engine/services/API/__init__.py @@ -33,10 +33,12 @@ from .tokens import routes as token_routes from ...server import EngineContext from .access_management.login import register_auth from .assemblies.management import register_assemblies +from .devices import routes as device_routes from .devices.approval import register_admin_endpoints from .devices.management import register_management +from .scheduled_jobs import management as scheduled_jobs_management -DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies") +DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs") _SERVER_SCOPE_PATTERN = re.compile(r"\\b(?:scope|context|agent_context)=([A-Za-z0-9_-]+)", re.IGNORECASE) _SERVER_AGENT_ID_PATTERN = re.compile(r"\\bagent_id=([^\\s,]+)", re.IGNORECASE) @@ -207,6 +209,11 @@ def _register_enrollment(app: Flask, adapters: EngineServiceAdapters) -> None: def _register_devices(app: Flask, adapters: EngineServiceAdapters) -> None: register_management(app, adapters) register_admin_endpoints(app, adapters) + device_routes.register_agents(app, adapters) + + +def _register_scheduled_jobs(app: Flask, adapters: EngineServiceAdapters) -> None: + scheduled_jobs_management.register_management(app, adapters) _GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]] = { @@ -215,6 +222,7 @@ _GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]] "enrollment": _register_enrollment, "devices": _register_devices, "assemblies": register_assemblies, + "scheduled_jobs": _register_scheduled_jobs, } diff --git a/Data/Engine/services/API/assemblies/execution.py b/Data/Engine/services/API/assemblies/execution.py new file mode 100644 index 00000000..eec87497 --- /dev/null +++ b/Data/Engine/services/API/assemblies/execution.py @@ -0,0 +1,274 @@ +# ====================================================== +# Data\Engine\services\API\assemblies\execution.py +# Description: Quick job dispatch and activity history endpoints for script and playbook assemblies. +# +# API Endpoints (if applicable): +# - POST /api/scripts/quick_run (Token Authenticated) - Queues a PowerShell assembly for execution on agents. +# - POST /api/ansible/quick_run (Token Authenticated) - (Not Yet Implemented) Placeholder for Ansible assemblies. +# - GET/DELETE /api/device/activity/ (Token Authenticated) - Retrieves or clears device activity history. +# - GET /api/device/activity/job/ (Token Authenticated) - Retrieves a specific activity record. +# ====================================================== + +"""Assembly execution helpers for the Borealis Engine runtime.""" +from __future__ import annotations + +import base64 +import os +import time +from typing import TYPE_CHECKING, Any, Dict, List + +from flask import Blueprint, jsonify, request + +from ..scheduled_jobs.management import ensure_scheduler, get_scheduler + +if TYPE_CHECKING: # pragma: no cover - typing aide + from flask import Flask + + from .. import EngineServiceAdapters + + +def _normalize_hostnames(value: Any) -> List[str]: + if not isinstance(value, list): + return [] + hosts: List[str] = [] + for item in value: + name = str(item or "").strip() + if name: + hosts.append(name) + return hosts + + +def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: + """Register quick execution endpoints for assemblies.""" + + ensure_scheduler(app, adapters) + blueprint = Blueprint("assemblies_execution", __name__) + service_log = adapters.service_log + + @blueprint.route("/api/scripts/quick_run", methods=["POST"]) + def scripts_quick_run(): + scheduler = get_scheduler(adapters) + data = request.get_json(silent=True) or {} + rel_path = (data.get("script_path") or "").strip() + hostnames = _normalize_hostnames(data.get("hostnames")) + run_mode = (data.get("run_mode") or "system").strip().lower() + + if not rel_path or not hostnames: + return jsonify({"error": "Missing script_path or hostnames[]"}), 400 + + scripts_root = scheduler._scripts_root() # type: ignore[attr-defined] + abs_path = os.path.abspath(os.path.join(scripts_root, rel_path)) + if ( + not abs_path.startswith(scripts_root) + or not scheduler._is_valid_scripts_relpath(rel_path) # type: ignore[attr-defined] + or not os.path.isfile(abs_path) + ): + return jsonify({"error": "Script not found"}), 404 + + doc = scheduler._load_assembly_document(abs_path, "scripts") # type: ignore[attr-defined] + script_type = (doc.get("type") or "powershell").lower() + if script_type != "powershell": + return jsonify({"error": f"Unsupported script type '{script_type}'. Only PowerShell is supported."}), 400 + + content = doc.get("script") or "" + doc_variables = doc.get("variables") if isinstance(doc.get("variables"), list) else [] + overrides_raw = data.get("variable_values") + overrides: Dict[str, Any] = {} + if isinstance(overrides_raw, dict): + for key, val in overrides_raw.items(): + name = str(key or "").strip() + if not name: + continue + overrides[name] = val + + env_map, variables, literal_lookup = scheduler._prepare_variable_context(doc_variables, overrides) # type: ignore[attr-defined] + content = scheduler._rewrite_powershell_script(content, literal_lookup) # type: ignore[attr-defined] + normalized_script = (content or "").replace("\r\n", "\n") + script_bytes = normalized_script.encode("utf-8") + encoded_content = ( + base64.b64encode(script_bytes).decode("ascii") + if script_bytes or normalized_script == "" + else "" + ) + + signature_b64 = "" + signing_key_b64 = "" + script_signer = adapters.script_signer + if script_signer is not None: + try: + signature_raw = script_signer.sign(script_bytes) + signature_b64 = base64.b64encode(signature_raw).decode("ascii") + signing_key_b64 = script_signer.public_base64_spki() + except Exception: + signature_b64 = "" + signing_key_b64 = "" + + try: + timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0)) + except Exception: + timeout_seconds = 0 + + friendly_name = (doc.get("name") or "").strip() or os.path.basename(abs_path) + now = int(time.time()) + results: List[Dict[str, Any]] = [] + socketio = getattr(adapters.context, "socketio", None) + if socketio is None: + return jsonify({"error": "Realtime transport unavailable; cannot dispatch quick job."}), 500 + + conn = None + try: + conn = adapters.db_conn_factory() + cur = conn.cursor() + for host in hostnames: + cur.execute( + """ + INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) + VALUES(?,?,?,?,?,?,?,?) + """, + ( + host, + rel_path.replace(os.sep, "/"), + friendly_name, + script_type, + now, + "Running", + "", + "", + ), + ) + job_id = cur.lastrowid + conn.commit() + + payload = { + "job_id": job_id, + "target_hostname": host, + "script_type": script_type, + "script_name": friendly_name, + "script_path": rel_path.replace(os.sep, "/"), + "script_content": encoded_content, + "script_encoding": "base64", + "environment": env_map, + "variables": variables, + "timeout_seconds": timeout_seconds, + "files": doc.get("files") if isinstance(doc.get("files"), list) else [], + "run_mode": run_mode, + } + if signature_b64: + payload["signature"] = signature_b64 + payload["sig_alg"] = "ed25519" + if signing_key_b64: + payload["signing_key"] = signing_key_b64 + + socketio.emit("quick_job_run", payload) + try: + socketio.emit( + "device_activity_changed", + { + "hostname": host, + "activity_id": job_id, + "change": "created", + "source": "quick_job", + }, + ) + except Exception: + pass + + results.append({"hostname": host, "job_id": job_id, "status": "Running"}) + service_log( + "assemblies", + f"quick job queued hostname={host} path={rel_path} run_mode={run_mode}", + ) + except Exception as exc: + if conn is not None: + conn.rollback() + return jsonify({"error": str(exc)}), 500 + finally: + if conn is not None: + conn.close() + + return jsonify({"results": results}) + + @blueprint.route("/api/ansible/quick_run", methods=["POST"]) + def ansible_quick_run(): + return jsonify({"error": "Ansible quick run is not yet available in the Engine runtime."}), 501 + + @blueprint.route("/api/device/activity/", methods=["GET", "DELETE"]) + def device_activity(hostname: str): + conn = None + try: + conn = adapters.db_conn_factory() + cur = conn.cursor() + if request.method == "DELETE": + cur.execute("DELETE FROM activity_history WHERE hostname = ?", (hostname,)) + conn.commit() + return jsonify({"status": "ok"}) + + cur.execute( + """ + SELECT id, script_name, script_path, script_type, ran_at, status, LENGTH(stdout), LENGTH(stderr) + FROM activity_history + WHERE hostname = ? + ORDER BY ran_at DESC, id DESC + """, + (hostname,), + ) + rows = cur.fetchall() + history = [] + for jid, name, path, stype, ran_at, status, so_len, se_len in rows: + history.append( + { + "id": jid, + "script_name": name, + "script_path": path, + "script_type": stype, + "ran_at": ran_at, + "status": status, + "has_stdout": bool(so_len or 0), + "has_stderr": bool(se_len or 0), + } + ) + return jsonify({"history": history}) + except Exception as exc: + return jsonify({"error": str(exc)}), 500 + finally: + if conn is not None: + conn.close() + + @blueprint.route("/api/device/activity/job/", methods=["GET"]) + def device_activity_job(job_id: int): + conn = None + try: + conn = adapters.db_conn_factory() + cur = conn.cursor() + cur.execute( + """ + SELECT id, hostname, script_name, script_path, script_type, ran_at, status, stdout, stderr + FROM activity_history + WHERE id = ? + """, + (job_id,), + ) + row = cur.fetchone() + if not row: + return jsonify({"error": "Not found"}), 404 + (jid, hostname, name, path, stype, ran_at, status, stdout, stderr) = row + return jsonify( + { + "id": jid, + "hostname": hostname, + "script_name": name, + "script_path": path, + "script_type": stype, + "ran_at": ran_at, + "status": status, + "stdout": stdout or "", + "stderr": stderr or "", + } + ) + except Exception as exc: + return jsonify({"error": str(exc)}), 500 + finally: + if conn is not None: + conn.close() + + app.register_blueprint(blueprint) diff --git a/Data/Engine/services/API/assemblies/management.py b/Data/Engine/services/API/assemblies/management.py index 4349151d..f839b42a 100644 --- a/Data/Engine/services/API/assemblies/management.py +++ b/Data/Engine/services/API/assemblies/management.py @@ -27,6 +27,8 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, MutableMapping, Opti from flask import Blueprint, jsonify, request +from . import execution as assemblies_execution + if TYPE_CHECKING: # pragma: no cover - typing aide from .. import EngineServiceAdapters @@ -726,4 +728,5 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: return jsonify(response), status app.register_blueprint(blueprint) + assemblies_execution.register_execution(app, adapters) diff --git a/Data/Engine/services/API/devices/routes.py b/Data/Engine/services/API/devices/routes.py new file mode 100644 index 00000000..9b741bb1 --- /dev/null +++ b/Data/Engine/services/API/devices/routes.py @@ -0,0 +1,221 @@ +# ====================================================== +# Data\Engine\services\API\devices\routes.py +# Description: Agent heartbeat and script polling endpoints aligned with device management APIs. +# +# API Endpoints (if applicable): +# - POST /api/agent/heartbeat (Device Authenticated) - Updates device last-seen metadata and inventory snapshots. +# - POST /api/agent/script/request (Device Authenticated) - Provides script execution payloads or idle signals to agents. +# ====================================================== + +"""Device-affiliated agent endpoints for the Borealis Engine runtime.""" +from __future__ import annotations + +import json +import sqlite3 +import time +from typing import TYPE_CHECKING, Any, Dict, Optional + +from flask import Blueprint, jsonify, request, g + +from ....auth.device_auth import AGENT_CONTEXT_HEADER, require_device_auth +from ....auth.guid_utils import normalize_guid + +if TYPE_CHECKING: # pragma: no cover - typing aide + from .. import EngineServiceAdapters + + +def _canonical_context(value: Optional[str]) -> Optional[str]: + if not value: + return None + cleaned = "".join(ch for ch in str(value) if ch.isalnum() or ch in ("_", "-")) + if not cleaned: + return None + return cleaned.upper() + + +def _json_or_none(value: Any) -> Optional[str]: + if value is None: + return None + try: + return json.dumps(value) + except Exception: + return None + + +def register_agents(app, adapters: "EngineServiceAdapters") -> None: + """Register agent heartbeat and script polling routes.""" + + blueprint = Blueprint("agents", __name__) + auth_manager = adapters.device_auth_manager + log = adapters.service_log + db_conn_factory = adapters.db_conn_factory + script_signer = adapters.script_signer + + def _context_hint(ctx: Optional[Any] = None) -> Optional[str]: + if ctx is not None and getattr(ctx, "service_mode", None): + return _canonical_context(getattr(ctx, "service_mode", None)) + return _canonical_context(request.headers.get(AGENT_CONTEXT_HEADER)) + + def _auth_context() -> Any: + ctx = getattr(g, "device_auth", None) + if ctx is None: + log("agents", f"device auth context missing for {request.path}", _context_hint()) + return ctx + + @blueprint.route("/api/agent/heartbeat", methods=["POST"]) + @require_device_auth(auth_manager) + def heartbeat(): + ctx = _auth_context() + if ctx is None: + return jsonify({"error": "auth_context_missing"}), 500 + + payload = request.get_json(force=True, silent=True) or {} + context_label = _context_hint(ctx) + now_ts = int(time.time()) + + updates: Dict[str, Optional[str]] = {"last_seen": now_ts} + + hostname = payload.get("hostname") + if isinstance(hostname, str) and hostname.strip(): + updates["hostname"] = hostname.strip() + + inventory = payload.get("inventory") if isinstance(payload.get("inventory"), dict) else {} + for key in ("memory", "network", "software", "storage", "cpu"): + if key in inventory and inventory[key] is not None: + encoded = _json_or_none(inventory[key]) + if encoded is not None: + updates[key] = encoded + + metrics = payload.get("metrics") if isinstance(payload.get("metrics"), dict) else {} + if metrics.get("last_user"): + updates["last_user"] = str(metrics["last_user"]) + if metrics.get("operating_system"): + updates["operating_system"] = str(metrics["operating_system"]) + if metrics.get("uptime") is not None: + try: + updates["uptime"] = int(metrics["uptime"]) + except Exception: + pass + + for field in ("external_ip", "internal_ip", "device_type"): + if payload.get(field): + updates[field] = str(payload[field]) + + conn = db_conn_factory() + try: + cur = conn.cursor() + + def _apply_updates() -> int: + if not updates: + return 0 + columns = ", ".join(f"{col} = ?" for col in updates.keys()) + values = list(updates.values()) + normalized_guid = normalize_guid(ctx.guid) + selected_guid: Optional[str] = None + if normalized_guid: + cur.execute( + "SELECT guid FROM devices WHERE UPPER(guid) = ?", + (normalized_guid,), + ) + rows = cur.fetchall() + for (stored_guid,) in rows or []: + if stored_guid == ctx.guid: + selected_guid = stored_guid + break + if not selected_guid and rows: + selected_guid = rows[0][0] + target_guid = selected_guid or ctx.guid + cur.execute( + f"UPDATE devices SET {columns} WHERE guid = ?", + values + [target_guid], + ) + updated = cur.rowcount + if updated > 0 and normalized_guid and target_guid != normalized_guid: + try: + cur.execute( + "UPDATE devices SET guid = ? WHERE guid = ?", + (normalized_guid, target_guid), + ) + except sqlite3.IntegrityError: + pass + return updated + + try: + rowcount = _apply_updates() + except sqlite3.IntegrityError as exc: + if "devices.hostname" in str(exc) and "UNIQUE" in str(exc).upper(): + existing_guid_for_hostname: Optional[str] = None + if "hostname" in updates: + try: + cur.execute( + "SELECT guid FROM devices WHERE hostname = ?", + (updates["hostname"],), + ) + row = cur.fetchone() + if row and row[0]: + existing_guid_for_hostname = normalize_guid(row[0]) + except Exception: + existing_guid_for_hostname = None + updates.pop("hostname", None) + rowcount = _apply_updates() + try: + current_guid = normalize_guid(ctx.guid) + except Exception: + current_guid = ctx.guid + if ( + existing_guid_for_hostname + and current_guid + and existing_guid_for_hostname != current_guid + ): + log( + "agents", + f"heartbeat hostname collision ignored for guid={ctx.guid}", + context_label, + level="WARNING", + ) + else: + raise + + if rowcount == 0: + log("agents", f"heartbeat missing device record guid={ctx.guid}", context_label, level="ERROR") + return jsonify({"error": "device_not_registered"}), 404 + conn.commit() + finally: + conn.close() + + return jsonify({"status": "ok", "poll_after_ms": 15000}) + + @blueprint.route("/api/agent/script/request", methods=["POST"]) + @require_device_auth(auth_manager) + def script_request(): + ctx = _auth_context() + if ctx is None: + return jsonify({"error": "auth_context_missing"}), 500 + + signing_key = "" + if script_signer is not None: + try: + signing_key = script_signer.public_base64_spki() + except Exception: + signing_key = "" + + if ctx.status != "active": + return jsonify( + { + "status": "quarantined", + "poll_after_ms": 60000, + "sig_alg": "ed25519", + "signing_key": signing_key, + } + ) + + return jsonify( + { + "status": "idle", + "poll_after_ms": 30000, + "sig_alg": "ed25519", + "signing_key": signing_key, + } + ) + + app.register_blueprint(blueprint) diff --git a/Data/Engine/services/API/scheduled_jobs/management.py b/Data/Engine/services/API/scheduled_jobs/management.py index a2d7ed1e..3327aea8 100644 --- a/Data/Engine/services/API/scheduled_jobs/management.py +++ b/Data/Engine/services/API/scheduled_jobs/management.py @@ -1,8 +1,82 @@ # ====================================================== # Data\Engine\services\API\scheduled_jobs\management.py -# Description: Placeholder for scheduled job management endpoints. +# Description: Integrates the legacy job scheduler for CRUD operations within the Engine API. # -# API Endpoints (if applicable): None +# API Endpoints (if applicable): +# - GET /api/scheduled_jobs (Token Authenticated) - Lists scheduled jobs with summary metadata. +# - POST /api/scheduled_jobs (Token Authenticated) - Creates a new scheduled job definition. +# - GET /api/scheduled_jobs/ (Token Authenticated) - Retrieves a scheduled job. +# - PUT /api/scheduled_jobs/ (Token Authenticated) - Updates a scheduled job. +# - POST /api/scheduled_jobs//toggle (Token Authenticated) - Enables or disables a job. +# - DELETE /api/scheduled_jobs/ (Token Authenticated) - Deletes a job. +# - GET /api/scheduled_jobs//runs (Token Authenticated) - Lists run history for a job. +# - GET /api/scheduled_jobs//devices (Token Authenticated) - Summarises device results for a run. +# - DELETE /api/scheduled_jobs//runs (Token Authenticated) - Clears run history for a job. # ====================================================== -"Placeholder for API module scheduled_jobs/management.py." +"""Scheduled job management integration for the Borealis Engine runtime.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +try: # pragma: no cover - legacy module import guard + import job_scheduler as legacy_job_scheduler # type: ignore +except Exception as exc: # pragma: no cover - runtime guard + legacy_job_scheduler = None # type: ignore + _SCHEDULER_IMPORT_ERROR = exc +else: + _SCHEDULER_IMPORT_ERROR = None + +if TYPE_CHECKING: # pragma: no cover - typing aide + from flask import Flask + + from .. import EngineServiceAdapters + + +def _raise_scheduler_import() -> None: + if _SCHEDULER_IMPORT_ERROR is not None: + raise RuntimeError( + "Legacy job scheduler module could not be imported; ensure Data/Server/job_scheduler.py " + "remains available during the Engine migration." + ) from _SCHEDULER_IMPORT_ERROR + + +def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"): + """Instantiate the legacy job scheduler and attach it to the Engine context.""" + + if getattr(adapters.context, "scheduler", None) is not None: + return adapters.context.scheduler + + _raise_scheduler_import() + + socketio = getattr(adapters.context, "socketio", None) + if socketio is None: + raise RuntimeError("Socket.IO instance is required to initialise the scheduled job service.") + + database_path = adapters.context.database_path + script_signer = adapters.script_signer + + scheduler = legacy_job_scheduler.register( + app, + socketio, + database_path, + script_signer=script_signer, + ) + scheduler.start() + adapters.context.scheduler = scheduler + adapters.service_log("scheduled_jobs", "legacy scheduler initialised", level="INFO") + return scheduler + + +def get_scheduler(adapters: "EngineServiceAdapters"): + scheduler = getattr(adapters.context, "scheduler", None) + if scheduler is None: + raise RuntimeError("Scheduled job service has not been initialised.") + return scheduler + + +def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None: + """Ensure scheduled job routes are registered via the legacy scheduler.""" + + ensure_scheduler(app, adapters) +