ENGINE: Added Scheduling Logic

This commit is contained in:
2025-10-29 20:51:45 -06:00
parent e68b52ef5a
commit 42e75aa258
7 changed files with 588 additions and 5 deletions

View File

@@ -51,7 +51,7 @@ def _build_runtime_config() -> Dict[str, Any]:
if api_groups_override:
api_groups: Any = api_groups_override
else:
api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies")
api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs")
return {
"HOST": os.environ.get("BOREALIS_ENGINE_HOST", DEFAULT_HOST),

View File

@@ -124,6 +124,7 @@ class EngineContext:
database_path: str
logger: logging.Logger
scheduler: Any
socketio: Optional[Any]
tls_cert_path: Optional[str]
tls_key_path: Optional[str]
tls_bundle_path: Optional[str]
@@ -140,6 +141,7 @@ def _build_engine_context(settings: EngineSettings, logger: logging.Logger) -> E
database_path=settings.database_path,
logger=logger,
scheduler=None,
socketio=None,
tls_cert_path=settings.tls_cert_path,
tls_key_path=settings.tls_key_path,
tls_bundle_path=settings.tls_bundle_path,
@@ -206,6 +208,7 @@ def create_app(config: Optional[Mapping[str, Any]] = None) -> Tuple[Flask, Socke
)
context = _build_engine_context(settings, logger)
context.socketio = socketio
api_logger = logging.getLogger("borealis.engine.api")
if not api_logger.handlers:

View File

@@ -33,10 +33,12 @@ from .tokens import routes as token_routes
from ...server import EngineContext
from .access_management.login import register_auth
from .assemblies.management import register_assemblies
from .devices import routes as device_routes
from .devices.approval import register_admin_endpoints
from .devices.management import register_management
from .scheduled_jobs import management as scheduled_jobs_management
DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies")
DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs")
_SERVER_SCOPE_PATTERN = re.compile(r"\\b(?:scope|context|agent_context)=([A-Za-z0-9_-]+)", re.IGNORECASE)
_SERVER_AGENT_ID_PATTERN = re.compile(r"\\bagent_id=([^\\s,]+)", re.IGNORECASE)
@@ -207,6 +209,11 @@ def _register_enrollment(app: Flask, adapters: EngineServiceAdapters) -> None:
def _register_devices(app: Flask, adapters: EngineServiceAdapters) -> None:
register_management(app, adapters)
register_admin_endpoints(app, adapters)
device_routes.register_agents(app, adapters)
def _register_scheduled_jobs(app: Flask, adapters: EngineServiceAdapters) -> None:
scheduled_jobs_management.register_management(app, adapters)
_GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]] = {
@@ -215,6 +222,7 @@ _GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]]
"enrollment": _register_enrollment,
"devices": _register_devices,
"assemblies": register_assemblies,
"scheduled_jobs": _register_scheduled_jobs,
}

View File

@@ -0,0 +1,274 @@
# ======================================================
# Data\Engine\services\API\assemblies\execution.py
# Description: Quick job dispatch and activity history endpoints for script and playbook assemblies.
#
# API Endpoints (if applicable):
# - POST /api/scripts/quick_run (Token Authenticated) - Queues a PowerShell assembly for execution on agents.
# - POST /api/ansible/quick_run (Token Authenticated) - (Not Yet Implemented) Placeholder for Ansible assemblies.
# - GET/DELETE /api/device/activity/<hostname> (Token Authenticated) - Retrieves or clears device activity history.
# - GET /api/device/activity/job/<int:job_id> (Token Authenticated) - Retrieves a specific activity record.
# ======================================================
"""Assembly execution helpers for the Borealis Engine runtime."""
from __future__ import annotations
import base64
import os
import time
from typing import TYPE_CHECKING, Any, Dict, List
from flask import Blueprint, jsonify, request
from ..scheduled_jobs.management import ensure_scheduler, get_scheduler
if TYPE_CHECKING: # pragma: no cover - typing aide
from flask import Flask
from .. import EngineServiceAdapters
def _normalize_hostnames(value: Any) -> List[str]:
if not isinstance(value, list):
return []
hosts: List[str] = []
for item in value:
name = str(item or "").strip()
if name:
hosts.append(name)
return hosts
def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None:
"""Register quick execution endpoints for assemblies."""
ensure_scheduler(app, adapters)
blueprint = Blueprint("assemblies_execution", __name__)
service_log = adapters.service_log
@blueprint.route("/api/scripts/quick_run", methods=["POST"])
def scripts_quick_run():
scheduler = get_scheduler(adapters)
data = request.get_json(silent=True) or {}
rel_path = (data.get("script_path") or "").strip()
hostnames = _normalize_hostnames(data.get("hostnames"))
run_mode = (data.get("run_mode") or "system").strip().lower()
if not rel_path or not hostnames:
return jsonify({"error": "Missing script_path or hostnames[]"}), 400
scripts_root = scheduler._scripts_root() # type: ignore[attr-defined]
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if (
not abs_path.startswith(scripts_root)
or not scheduler._is_valid_scripts_relpath(rel_path) # type: ignore[attr-defined]
or not os.path.isfile(abs_path)
):
return jsonify({"error": "Script not found"}), 404
doc = scheduler._load_assembly_document(abs_path, "scripts") # type: ignore[attr-defined]
script_type = (doc.get("type") or "powershell").lower()
if script_type != "powershell":
return jsonify({"error": f"Unsupported script type '{script_type}'. Only PowerShell is supported."}), 400
content = doc.get("script") or ""
doc_variables = doc.get("variables") if isinstance(doc.get("variables"), list) else []
overrides_raw = data.get("variable_values")
overrides: Dict[str, Any] = {}
if isinstance(overrides_raw, dict):
for key, val in overrides_raw.items():
name = str(key or "").strip()
if not name:
continue
overrides[name] = val
env_map, variables, literal_lookup = scheduler._prepare_variable_context(doc_variables, overrides) # type: ignore[attr-defined]
content = scheduler._rewrite_powershell_script(content, literal_lookup) # type: ignore[attr-defined]
normalized_script = (content or "").replace("\r\n", "\n")
script_bytes = normalized_script.encode("utf-8")
encoded_content = (
base64.b64encode(script_bytes).decode("ascii")
if script_bytes or normalized_script == ""
else ""
)
signature_b64 = ""
signing_key_b64 = ""
script_signer = adapters.script_signer
if script_signer is not None:
try:
signature_raw = script_signer.sign(script_bytes)
signature_b64 = base64.b64encode(signature_raw).decode("ascii")
signing_key_b64 = script_signer.public_base64_spki()
except Exception:
signature_b64 = ""
signing_key_b64 = ""
try:
timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0))
except Exception:
timeout_seconds = 0
friendly_name = (doc.get("name") or "").strip() or os.path.basename(abs_path)
now = int(time.time())
results: List[Dict[str, Any]] = []
socketio = getattr(adapters.context, "socketio", None)
if socketio is None:
return jsonify({"error": "Realtime transport unavailable; cannot dispatch quick job."}), 500
conn = None
try:
conn = adapters.db_conn_factory()
cur = conn.cursor()
for host in hostnames:
cur.execute(
"""
INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr)
VALUES(?,?,?,?,?,?,?,?)
""",
(
host,
rel_path.replace(os.sep, "/"),
friendly_name,
script_type,
now,
"Running",
"",
"",
),
)
job_id = cur.lastrowid
conn.commit()
payload = {
"job_id": job_id,
"target_hostname": host,
"script_type": script_type,
"script_name": friendly_name,
"script_path": rel_path.replace(os.sep, "/"),
"script_content": encoded_content,
"script_encoding": "base64",
"environment": env_map,
"variables": variables,
"timeout_seconds": timeout_seconds,
"files": doc.get("files") if isinstance(doc.get("files"), list) else [],
"run_mode": run_mode,
}
if signature_b64:
payload["signature"] = signature_b64
payload["sig_alg"] = "ed25519"
if signing_key_b64:
payload["signing_key"] = signing_key_b64
socketio.emit("quick_job_run", payload)
try:
socketio.emit(
"device_activity_changed",
{
"hostname": host,
"activity_id": job_id,
"change": "created",
"source": "quick_job",
},
)
except Exception:
pass
results.append({"hostname": host, "job_id": job_id, "status": "Running"})
service_log(
"assemblies",
f"quick job queued hostname={host} path={rel_path} run_mode={run_mode}",
)
except Exception as exc:
if conn is not None:
conn.rollback()
return jsonify({"error": str(exc)}), 500
finally:
if conn is not None:
conn.close()
return jsonify({"results": results})
@blueprint.route("/api/ansible/quick_run", methods=["POST"])
def ansible_quick_run():
return jsonify({"error": "Ansible quick run is not yet available in the Engine runtime."}), 501
@blueprint.route("/api/device/activity/<hostname>", methods=["GET", "DELETE"])
def device_activity(hostname: str):
conn = None
try:
conn = adapters.db_conn_factory()
cur = conn.cursor()
if request.method == "DELETE":
cur.execute("DELETE FROM activity_history WHERE hostname = ?", (hostname,))
conn.commit()
return jsonify({"status": "ok"})
cur.execute(
"""
SELECT id, script_name, script_path, script_type, ran_at, status, LENGTH(stdout), LENGTH(stderr)
FROM activity_history
WHERE hostname = ?
ORDER BY ran_at DESC, id DESC
""",
(hostname,),
)
rows = cur.fetchall()
history = []
for jid, name, path, stype, ran_at, status, so_len, se_len in rows:
history.append(
{
"id": jid,
"script_name": name,
"script_path": path,
"script_type": stype,
"ran_at": ran_at,
"status": status,
"has_stdout": bool(so_len or 0),
"has_stderr": bool(se_len or 0),
}
)
return jsonify({"history": history})
except Exception as exc:
return jsonify({"error": str(exc)}), 500
finally:
if conn is not None:
conn.close()
@blueprint.route("/api/device/activity/job/<int:job_id>", methods=["GET"])
def device_activity_job(job_id: int):
conn = None
try:
conn = adapters.db_conn_factory()
cur = conn.cursor()
cur.execute(
"""
SELECT id, hostname, script_name, script_path, script_type, ran_at, status, stdout, stderr
FROM activity_history
WHERE id = ?
""",
(job_id,),
)
row = cur.fetchone()
if not row:
return jsonify({"error": "Not found"}), 404
(jid, hostname, name, path, stype, ran_at, status, stdout, stderr) = row
return jsonify(
{
"id": jid,
"hostname": hostname,
"script_name": name,
"script_path": path,
"script_type": stype,
"ran_at": ran_at,
"status": status,
"stdout": stdout or "",
"stderr": stderr or "",
}
)
except Exception as exc:
return jsonify({"error": str(exc)}), 500
finally:
if conn is not None:
conn.close()
app.register_blueprint(blueprint)

View File

@@ -27,6 +27,8 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, MutableMapping, Opti
from flask import Blueprint, jsonify, request
from . import execution as assemblies_execution
if TYPE_CHECKING: # pragma: no cover - typing aide
from .. import EngineServiceAdapters
@@ -726,4 +728,5 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None:
return jsonify(response), status
app.register_blueprint(blueprint)
assemblies_execution.register_execution(app, adapters)

View File

@@ -0,0 +1,221 @@
# ======================================================
# Data\Engine\services\API\devices\routes.py
# Description: Agent heartbeat and script polling endpoints aligned with device management APIs.
#
# API Endpoints (if applicable):
# - POST /api/agent/heartbeat (Device Authenticated) - Updates device last-seen metadata and inventory snapshots.
# - POST /api/agent/script/request (Device Authenticated) - Provides script execution payloads or idle signals to agents.
# ======================================================
"""Device-affiliated agent endpoints for the Borealis Engine runtime."""
from __future__ import annotations
import json
import sqlite3
import time
from typing import TYPE_CHECKING, Any, Dict, Optional
from flask import Blueprint, jsonify, request, g
from ....auth.device_auth import AGENT_CONTEXT_HEADER, require_device_auth
from ....auth.guid_utils import normalize_guid
if TYPE_CHECKING: # pragma: no cover - typing aide
from .. import EngineServiceAdapters
def _canonical_context(value: Optional[str]) -> Optional[str]:
if not value:
return None
cleaned = "".join(ch for ch in str(value) if ch.isalnum() or ch in ("_", "-"))
if not cleaned:
return None
return cleaned.upper()
def _json_or_none(value: Any) -> Optional[str]:
if value is None:
return None
try:
return json.dumps(value)
except Exception:
return None
def register_agents(app, adapters: "EngineServiceAdapters") -> None:
"""Register agent heartbeat and script polling routes."""
blueprint = Blueprint("agents", __name__)
auth_manager = adapters.device_auth_manager
log = adapters.service_log
db_conn_factory = adapters.db_conn_factory
script_signer = adapters.script_signer
def _context_hint(ctx: Optional[Any] = None) -> Optional[str]:
if ctx is not None and getattr(ctx, "service_mode", None):
return _canonical_context(getattr(ctx, "service_mode", None))
return _canonical_context(request.headers.get(AGENT_CONTEXT_HEADER))
def _auth_context() -> Any:
ctx = getattr(g, "device_auth", None)
if ctx is None:
log("agents", f"device auth context missing for {request.path}", _context_hint())
return ctx
@blueprint.route("/api/agent/heartbeat", methods=["POST"])
@require_device_auth(auth_manager)
def heartbeat():
ctx = _auth_context()
if ctx is None:
return jsonify({"error": "auth_context_missing"}), 500
payload = request.get_json(force=True, silent=True) or {}
context_label = _context_hint(ctx)
now_ts = int(time.time())
updates: Dict[str, Optional[str]] = {"last_seen": now_ts}
hostname = payload.get("hostname")
if isinstance(hostname, str) and hostname.strip():
updates["hostname"] = hostname.strip()
inventory = payload.get("inventory") if isinstance(payload.get("inventory"), dict) else {}
for key in ("memory", "network", "software", "storage", "cpu"):
if key in inventory and inventory[key] is not None:
encoded = _json_or_none(inventory[key])
if encoded is not None:
updates[key] = encoded
metrics = payload.get("metrics") if isinstance(payload.get("metrics"), dict) else {}
if metrics.get("last_user"):
updates["last_user"] = str(metrics["last_user"])
if metrics.get("operating_system"):
updates["operating_system"] = str(metrics["operating_system"])
if metrics.get("uptime") is not None:
try:
updates["uptime"] = int(metrics["uptime"])
except Exception:
pass
for field in ("external_ip", "internal_ip", "device_type"):
if payload.get(field):
updates[field] = str(payload[field])
conn = db_conn_factory()
try:
cur = conn.cursor()
def _apply_updates() -> int:
if not updates:
return 0
columns = ", ".join(f"{col} = ?" for col in updates.keys())
values = list(updates.values())
normalized_guid = normalize_guid(ctx.guid)
selected_guid: Optional[str] = None
if normalized_guid:
cur.execute(
"SELECT guid FROM devices WHERE UPPER(guid) = ?",
(normalized_guid,),
)
rows = cur.fetchall()
for (stored_guid,) in rows or []:
if stored_guid == ctx.guid:
selected_guid = stored_guid
break
if not selected_guid and rows:
selected_guid = rows[0][0]
target_guid = selected_guid or ctx.guid
cur.execute(
f"UPDATE devices SET {columns} WHERE guid = ?",
values + [target_guid],
)
updated = cur.rowcount
if updated > 0 and normalized_guid and target_guid != normalized_guid:
try:
cur.execute(
"UPDATE devices SET guid = ? WHERE guid = ?",
(normalized_guid, target_guid),
)
except sqlite3.IntegrityError:
pass
return updated
try:
rowcount = _apply_updates()
except sqlite3.IntegrityError as exc:
if "devices.hostname" in str(exc) and "UNIQUE" in str(exc).upper():
existing_guid_for_hostname: Optional[str] = None
if "hostname" in updates:
try:
cur.execute(
"SELECT guid FROM devices WHERE hostname = ?",
(updates["hostname"],),
)
row = cur.fetchone()
if row and row[0]:
existing_guid_for_hostname = normalize_guid(row[0])
except Exception:
existing_guid_for_hostname = None
updates.pop("hostname", None)
rowcount = _apply_updates()
try:
current_guid = normalize_guid(ctx.guid)
except Exception:
current_guid = ctx.guid
if (
existing_guid_for_hostname
and current_guid
and existing_guid_for_hostname != current_guid
):
log(
"agents",
f"heartbeat hostname collision ignored for guid={ctx.guid}",
context_label,
level="WARNING",
)
else:
raise
if rowcount == 0:
log("agents", f"heartbeat missing device record guid={ctx.guid}", context_label, level="ERROR")
return jsonify({"error": "device_not_registered"}), 404
conn.commit()
finally:
conn.close()
return jsonify({"status": "ok", "poll_after_ms": 15000})
@blueprint.route("/api/agent/script/request", methods=["POST"])
@require_device_auth(auth_manager)
def script_request():
ctx = _auth_context()
if ctx is None:
return jsonify({"error": "auth_context_missing"}), 500
signing_key = ""
if script_signer is not None:
try:
signing_key = script_signer.public_base64_spki()
except Exception:
signing_key = ""
if ctx.status != "active":
return jsonify(
{
"status": "quarantined",
"poll_after_ms": 60000,
"sig_alg": "ed25519",
"signing_key": signing_key,
}
)
return jsonify(
{
"status": "idle",
"poll_after_ms": 30000,
"sig_alg": "ed25519",
"signing_key": signing_key,
}
)
app.register_blueprint(blueprint)

View File

@@ -1,8 +1,82 @@
# ======================================================
# Data\Engine\services\API\scheduled_jobs\management.py
# Description: Placeholder for scheduled job management endpoints.
# Description: Integrates the legacy job scheduler for CRUD operations within the Engine API.
#
# API Endpoints (if applicable): None
# API Endpoints (if applicable):
# - GET /api/scheduled_jobs (Token Authenticated) - Lists scheduled jobs with summary metadata.
# - POST /api/scheduled_jobs (Token Authenticated) - Creates a new scheduled job definition.
# - GET /api/scheduled_jobs/<int:job_id> (Token Authenticated) - Retrieves a scheduled job.
# - PUT /api/scheduled_jobs/<int:job_id> (Token Authenticated) - Updates a scheduled job.
# - POST /api/scheduled_jobs/<int:job_id>/toggle (Token Authenticated) - Enables or disables a job.
# - DELETE /api/scheduled_jobs/<int:job_id> (Token Authenticated) - Deletes a job.
# - GET /api/scheduled_jobs/<int:job_id>/runs (Token Authenticated) - Lists run history for a job.
# - GET /api/scheduled_jobs/<int:job_id>/devices (Token Authenticated) - Summarises device results for a run.
# - DELETE /api/scheduled_jobs/<int:job_id>/runs (Token Authenticated) - Clears run history for a job.
# ======================================================
"Placeholder for API module scheduled_jobs/management.py."
"""Scheduled job management integration for the Borealis Engine runtime."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
try: # pragma: no cover - legacy module import guard
import job_scheduler as legacy_job_scheduler # type: ignore
except Exception as exc: # pragma: no cover - runtime guard
legacy_job_scheduler = None # type: ignore
_SCHEDULER_IMPORT_ERROR = exc
else:
_SCHEDULER_IMPORT_ERROR = None
if TYPE_CHECKING: # pragma: no cover - typing aide
from flask import Flask
from .. import EngineServiceAdapters
def _raise_scheduler_import() -> None:
if _SCHEDULER_IMPORT_ERROR is not None:
raise RuntimeError(
"Legacy job scheduler module could not be imported; ensure Data/Server/job_scheduler.py "
"remains available during the Engine migration."
) from _SCHEDULER_IMPORT_ERROR
def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"):
"""Instantiate the legacy job scheduler and attach it to the Engine context."""
if getattr(adapters.context, "scheduler", None) is not None:
return adapters.context.scheduler
_raise_scheduler_import()
socketio = getattr(adapters.context, "socketio", None)
if socketio is None:
raise RuntimeError("Socket.IO instance is required to initialise the scheduled job service.")
database_path = adapters.context.database_path
script_signer = adapters.script_signer
scheduler = legacy_job_scheduler.register(
app,
socketio,
database_path,
script_signer=script_signer,
)
scheduler.start()
adapters.context.scheduler = scheduler
adapters.service_log("scheduled_jobs", "legacy scheduler initialised", level="INFO")
return scheduler
def get_scheduler(adapters: "EngineServiceAdapters"):
scheduler = getattr(adapters.context, "scheduler", None)
if scheduler is None:
raise RuntimeError("Scheduled job service has not been initialised.")
return scheduler
def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None:
"""Ensure scheduled job routes are registered via the legacy scheduler."""
ensure_scheduler(app, adapters)