From d9f2a37b7434cd87465e3bf6b2ddae591c06547d Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Wed, 22 Oct 2025 13:56:49 -0600 Subject: [PATCH] Add Engine scheduler service and job interfaces --- Data/Engine/CURRENT_STAGE.md | 2 +- Data/Engine/README.md | 13 +- Data/Engine/bootstrapper.py | 1 + Data/Engine/builders/job_fabricator.py | 382 ++++++++++++++++++ Data/Engine/interfaces/http/__init__.py | 3 +- Data/Engine/interfaces/http/job_management.py | 108 +++++ .../interfaces/ws/job_management/events.py | 8 + Data/Engine/repositories/sqlite/__init__.py | 2 + .../repositories/sqlite/job_repository.py | 355 ++++++++++++++++ Data/Engine/repositories/sqlite/migrations.py | 93 +++++ Data/Engine/services/__init__.py | 2 + Data/Engine/services/container.py | 11 + Data/Engine/services/jobs/__init__.py | 5 + .../Engine/services/jobs/scheduler_service.py | 373 +++++++++++++++++ 14 files changed, 1355 insertions(+), 3 deletions(-) create mode 100644 Data/Engine/builders/job_fabricator.py create mode 100644 Data/Engine/interfaces/http/job_management.py create mode 100644 Data/Engine/repositories/sqlite/job_repository.py create mode 100644 Data/Engine/services/jobs/__init__.py create mode 100644 Data/Engine/services/jobs/scheduler_service.py diff --git a/Data/Engine/CURRENT_STAGE.md b/Data/Engine/CURRENT_STAGE.md index 0dc0eed..8d43581 100644 --- a/Data/Engine/CURRENT_STAGE.md +++ b/Data/Engine/CURRENT_STAGE.md @@ -48,7 +48,7 @@ - 9.2 Replace global state with repository/service calls where feasible; otherwise encapsulate in Engine-managed caches. - 9.3 Validate namespace registration with Socket.IO test clients before committing. -10. Scheduler & job management +[COMPLETED] 10. Scheduler & job management - 10.1 Port scheduler core into `services/jobs/scheduler_service.py`; wrap job state persistence via new repositories. - 10.2 Implement `builders/job_fabricator.py` for manifest assembly; ensure immutability and validation. - 10.3 Expose HTTP orchestration via `interfaces/http/job_management.py` and WS notifications via dedicated modules. diff --git a/Data/Engine/README.md b/Data/Engine/README.md index bba7df1..902af0d 100644 --- a/Data/Engine/README.md +++ b/Data/Engine/README.md @@ -45,7 +45,7 @@ Step 9 introduces real-time handlers backed by the new service container: - `Data/Engine/services/realtime/agent_registry.py` manages connected-agent state, last-seen persistence, collector updates, and screenshot caches without sharing globals with the legacy server. - `Data/Engine/interfaces/ws/agents/events.py` ports the agent namespace, handling connect/disconnect logging, heartbeat reconciliation, screenshot relays, macro status broadcasts, and provisioning lookups through the realtime service. -- `Data/Engine/interfaces/ws/job_management/events.py` registers the job namespace; detailed scheduler coordination will arrive with the Step 10 migration. +- `Data/Engine/interfaces/ws/job_management/events.py` now forwards scheduler updates and responds to job status requests, keeping WebSocket clients informed as new runs are simulated. The WebSocket factory (`Data/Engine/interfaces/ws/__init__.py`) now accepts the Engine service container so namespaces can resolve dependencies just like their HTTP counterparts. @@ -69,3 +69,14 @@ Step 7 ports the first persistence adapters into the Engine: - `Data/Engine/repositories/sqlite/enrollment_repository.py` surfaces enrollment install-code counters and device approval records so future services can operate without touching raw SQL. Each repository accepts the shared `SQLiteConnectionFactory`, keeping all SQL execution confined to the Engine layer while services depend only on protocol interfaces. + +## Job scheduling services + +Step 10 migrates the foundational job scheduler into the Engine: + +- `Data/Engine/builders/job_fabricator.py` transforms stored job definitions into immutable manifests, decoding scripts, resolving environment variables, and preparing execution metadata. +- `Data/Engine/repositories/sqlite/job_repository.py` encapsulates scheduled job persistence, run history, and status tracking in SQLite. +- `Data/Engine/services/jobs/scheduler_service.py` runs the background evaluation loop, emits Socket.IO lifecycle events, and exposes CRUD helpers for the HTTP and WebSocket interfaces. +- `Data/Engine/interfaces/http/job_management.py` mirrors the legacy REST surface for creating, updating, toggling, and inspecting scheduled jobs and their run history. + +The scheduler service starts automatically from `Data/Engine/bootstrapper.py` once the Engine runtime builds the service container, ensuring a no-op scheduling loop executes independently of the legacy server. diff --git a/Data/Engine/bootstrapper.py b/Data/Engine/bootstrapper.py index 56da58e..0593ca2 100644 --- a/Data/Engine/bootstrapper.py +++ b/Data/Engine/bootstrapper.py @@ -51,6 +51,7 @@ def bootstrap() -> EngineRuntime: register_http_interfaces(app, services) socketio = create_socket_server(app, settings.socketio) register_ws_interfaces(socketio, services) + services.scheduler_service.start(socketio) logger.info("bootstrap-complete") return EngineRuntime(app=app, settings=settings, socketio=socketio, db_factory=db_factory) diff --git a/Data/Engine/builders/job_fabricator.py b/Data/Engine/builders/job_fabricator.py new file mode 100644 index 0000000..9ca8eef --- /dev/null +++ b/Data/Engine/builders/job_fabricator.py @@ -0,0 +1,382 @@ +"""Builders for Engine job manifests.""" + +from __future__ import annotations + +import base64 +import json +import logging +import os +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple + +from Data.Engine.repositories.sqlite.job_repository import ScheduledJobRecord + +__all__ = [ + "JobComponentManifest", + "JobManifest", + "JobFabricator", +] + + +_ENV_VAR_PATTERN = re.compile(r"(?i)\$env:(\{)?([A-Za-z0-9_\-]+)(?(1)\})") + + +@dataclass(frozen=True, slots=True) +class JobComponentManifest: + """Materialized job component ready for execution.""" + + name: str + path: str + script_type: str + script_content: str + encoded_content: str + environment: Dict[str, str] + literal_environment: Dict[str, str] + timeout_seconds: int + + +@dataclass(frozen=True, slots=True) +class JobManifest: + job_id: int + name: str + occurrence_ts: int + execution_context: str + targets: Tuple[str, ...] + components: Tuple[JobComponentManifest, ...] + + +class JobFabricator: + """Convert stored job records into immutable manifests.""" + + def __init__( + self, + *, + assemblies_root: Path, + logger: Optional[logging.Logger] = None, + ) -> None: + self._assemblies_root = assemblies_root + self._log = logger or logging.getLogger("borealis.engine.builders.jobs") + + def build( + self, + job: ScheduledJobRecord, + *, + occurrence_ts: int, + ) -> JobManifest: + components = tuple(self._materialize_component(job, component) for component in job.components) + targets = tuple(str(t) for t in job.targets) + return JobManifest( + job_id=job.id, + name=job.name, + occurrence_ts=occurrence_ts, + execution_context=job.execution_context, + targets=targets, + components=tuple(c for c in components if c is not None), + ) + + # ------------------------------------------------------------------ + # Component handling + # ------------------------------------------------------------------ + def _materialize_component( + self, + job: ScheduledJobRecord, + component: Mapping[str, Any], + ) -> Optional[JobComponentManifest]: + if not isinstance(component, Mapping): + return None + + component_type = str(component.get("type") or "").strip().lower() + if component_type not in {"script", "ansible"}: + return None + + path = str(component.get("path") or component.get("script_path") or "").strip() + if not path: + return None + + try: + abs_path = self._resolve_script_path(path) + except FileNotFoundError: + self._log.warning( + "job component path invalid", extra={"job_id": job.id, "path": path} + ) + return None + script_type = self._detect_script_type(abs_path, component_type) + script_content = self._load_script_content(abs_path, component) + + doc_variables: List[Dict[str, Any]] = [] + if isinstance(component.get("variables"), list): + doc_variables = [v for v in component["variables"] if isinstance(v, dict)] + overrides = self._collect_overrides(component) + env_map, _, literal_lookup = _prepare_variable_context(doc_variables, overrides) + + rewritten = _rewrite_powershell_script(script_content, literal_lookup) + encoded = _encode_script_content(rewritten) + + timeout_seconds = _coerce_int(component.get("timeout_seconds")) + if not timeout_seconds: + timeout_seconds = _coerce_int(component.get("timeout")) + + return JobComponentManifest( + name=self._component_name(abs_path, component), + path=path, + script_type=script_type, + script_content=rewritten, + encoded_content=encoded, + environment=env_map, + literal_environment=literal_lookup, + timeout_seconds=timeout_seconds, + ) + + def _component_name(self, abs_path: Path, component: Mapping[str, Any]) -> str: + if isinstance(component.get("name"), str) and component["name"].strip(): + return component["name"].strip() + return abs_path.stem + + def _resolve_script_path(self, rel_path: str) -> Path: + candidate = Path(rel_path.replace("\\", "/").lstrip("/")) + if candidate.parts and candidate.parts[0] != "Scripts": + candidate = Path("Scripts") / candidate + abs_path = (self._assemblies_root / candidate).resolve() + try: + abs_path.relative_to(self._assemblies_root) + except ValueError: + raise FileNotFoundError(rel_path) + if not abs_path.is_file(): + raise FileNotFoundError(rel_path) + return abs_path + + def _load_script_content(self, abs_path: Path, component: Mapping[str, Any]) -> str: + if isinstance(component.get("script"), str) and component["script"].strip(): + return _decode_script_content(component["script"], component.get("encoding") or "") + try: + return abs_path.read_text(encoding="utf-8") + except Exception as exc: + self._log.warning("unable to read script for job component: path=%s error=%s", abs_path, exc) + return "" + + def _detect_script_type(self, abs_path: Path, declared: str) -> str: + lower = declared.lower() + if lower in {"script", "powershell"}: + return "powershell" + suffix = abs_path.suffix.lower() + if suffix == ".ps1": + return "powershell" + if suffix == ".yml": + return "ansible" + if suffix == ".json": + try: + data = json.loads(abs_path.read_text(encoding="utf-8")) + if isinstance(data, dict): + t = str(data.get("type") or data.get("script_type") or "").strip().lower() + if t: + return t + except Exception: + pass + return lower or "powershell" + + def _collect_overrides(self, component: Mapping[str, Any]) -> Dict[str, Any]: + overrides: Dict[str, Any] = {} + values = component.get("variable_values") + if isinstance(values, Mapping): + for key, value in values.items(): + name = str(key or "").strip() + if name: + overrides[name] = value + vars_inline = component.get("variables") + if isinstance(vars_inline, Iterable): + for var in vars_inline: + if not isinstance(var, Mapping): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + if "value" in var: + overrides[name] = var.get("value") + return overrides + + +def _coerce_int(value: Any) -> int: + try: + return int(value or 0) + except Exception: + return 0 + + +def _env_string(value: Any) -> str: + if isinstance(value, bool): + return "True" if value else "False" + if value is None: + return "" + return str(value) + + +def _decode_base64_text(value: Any) -> Optional[str]: + if not isinstance(value, str): + return None + stripped = value.strip() + if not stripped: + return "" + try: + cleaned = re.sub(r"\s+", "", stripped) + except Exception: + cleaned = stripped + try: + decoded = base64.b64decode(cleaned, validate=True) + except Exception: + return None + try: + return decoded.decode("utf-8") + except Exception: + return decoded.decode("utf-8", errors="replace") + + +def _decode_script_content(value: Any, encoding_hint: Any = "") -> str: + encoding = str(encoding_hint or "").strip().lower() + if isinstance(value, str): + if encoding in {"base64", "b64", "base-64"}: + decoded = _decode_base64_text(value) + if decoded is not None: + return decoded.replace("\r\n", "\n") + decoded = _decode_base64_text(value) + if decoded is not None: + return decoded.replace("\r\n", "\n") + return value.replace("\r\n", "\n") + return "" + + +def _encode_script_content(script_text: Any) -> str: + if not isinstance(script_text, str): + if script_text is None: + script_text = "" + else: + script_text = str(script_text) + normalized = script_text.replace("\r\n", "\n") + if not normalized: + return "" + encoded = base64.b64encode(normalized.encode("utf-8")) + return encoded.decode("ascii") + + +def _canonical_env_key(name: Any) -> str: + try: + return re.sub(r"[^A-Za-z0-9_]", "_", str(name or "").strip()).upper() + except Exception: + return "" + + +def _expand_env_aliases(env_map: Dict[str, str], variables: Iterable[Mapping[str, Any]]) -> Dict[str, str]: + expanded = dict(env_map or {}) + for var in variables: + if not isinstance(var, Mapping): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + canonical = _canonical_env_key(name) + if not canonical or canonical not in expanded: + continue + value = expanded[canonical] + alias = re.sub(r"[^A-Za-z0-9_]", "_", name) + if alias and alias not in expanded: + expanded[alias] = value + if alias != name and re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name) and name not in expanded: + expanded[name] = value + return expanded + + +def _powershell_literal(value: Any, var_type: str) -> str: + typ = str(var_type or "string").lower() + if typ == "boolean": + if isinstance(value, bool): + truthy = value + elif value is None: + truthy = False + elif isinstance(value, (int, float)): + truthy = value != 0 + else: + s = str(value).strip().lower() + if s in {"true", "1", "yes", "y", "on"}: + truthy = True + elif s in {"false", "0", "no", "n", "off", ""}: + truthy = False + else: + truthy = bool(s) + return "$true" if truthy else "$false" + if typ == "number": + if value is None or value == "": + return "0" + return str(value) + s = "" if value is None else str(value) + return "'" + s.replace("'", "''") + "'" + + +def _extract_variable_default(var: Mapping[str, Any]) -> Any: + for key in ("value", "default", "defaultValue", "default_value"): + if key in var: + val = var.get(key) + return "" if val is None else val + return "" + + +def _prepare_variable_context( + doc_variables: Iterable[Mapping[str, Any]], + overrides: Mapping[str, Any], +) -> Tuple[Dict[str, str], List[Dict[str, Any]], Dict[str, str]]: + env_map: Dict[str, str] = {} + variables: List[Dict[str, Any]] = [] + literal_lookup: Dict[str, str] = {} + doc_names: Dict[str, bool] = {} + + overrides = dict(overrides or {}) + + for var in doc_variables: + if not isinstance(var, Mapping): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + doc_names[name] = True + canonical = _canonical_env_key(name) + var_type = str(var.get("type") or "string").lower() + default_val = _extract_variable_default(var) + final_val = overrides[name] if name in overrides else default_val + if canonical: + env_map[canonical] = _env_string(final_val) + literal_lookup[canonical] = _powershell_literal(final_val, var_type) + if name in overrides: + new_var = dict(var) + new_var["value"] = overrides[name] + variables.append(new_var) + else: + variables.append(dict(var)) + + for name, val in overrides.items(): + if name in doc_names: + continue + canonical = _canonical_env_key(name) + if canonical: + env_map[canonical] = _env_string(val) + literal_lookup[canonical] = _powershell_literal(val, "string") + variables.append({"name": name, "value": val, "type": "string"}) + + env_map = _expand_env_aliases(env_map, variables) + return env_map, variables, literal_lookup + + +def _rewrite_powershell_script(content: str, literal_lookup: Mapping[str, str]) -> str: + if not content or not literal_lookup: + return content + + def _replace(match: re.Match[str]) -> str: + name = match.group(2) + canonical = _canonical_env_key(name) + if not canonical: + return match.group(0) + literal = literal_lookup.get(canonical) + if literal is None: + return match.group(0) + return literal + + return _ENV_VAR_PATTERN.sub(_replace, content) diff --git a/Data/Engine/interfaces/http/__init__.py b/Data/Engine/interfaces/http/__init__.py index 7626bf1..97ef6b0 100644 --- a/Data/Engine/interfaces/http/__init__.py +++ b/Data/Engine/interfaces/http/__init__.py @@ -6,13 +6,14 @@ from flask import Flask from Data.Engine.services.container import EngineServiceContainer -from . import admin, agents, enrollment, health, tokens +from . import admin, agents, enrollment, health, job_management, tokens _REGISTRARS = ( health.register, agents.register, enrollment.register, tokens.register, + job_management.register, admin.register, ) diff --git a/Data/Engine/interfaces/http/job_management.py b/Data/Engine/interfaces/http/job_management.py new file mode 100644 index 0000000..93c30ab --- /dev/null +++ b/Data/Engine/interfaces/http/job_management.py @@ -0,0 +1,108 @@ +"""HTTP routes for Engine job management.""" + +from __future__ import annotations + +from typing import Any, Optional + +from flask import Blueprint, Flask, jsonify, request + +from Data.Engine.services.container import EngineServiceContainer + +bp = Blueprint("engine_job_management", __name__) + + +def register(app: Flask, services: EngineServiceContainer) -> None: + bp.services = services # type: ignore[attr-defined] + app.register_blueprint(bp) + + +def _services() -> EngineServiceContainer: + svc = getattr(bp, "services", None) + if svc is None: # pragma: no cover - guard + raise RuntimeError("job management blueprint not initialized") + return svc + + +@bp.route("/api/scheduled_jobs", methods=["GET"]) +def list_jobs() -> Any: + jobs = _services().scheduler_service.list_jobs() + return jsonify({"jobs": jobs}) + + +@bp.route("/api/scheduled_jobs", methods=["POST"]) +def create_job() -> Any: + payload = _json_body() + try: + job = _services().scheduler_service.create_job(payload) + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + return jsonify({"job": job}) + + +@bp.route("/api/scheduled_jobs/", methods=["GET"]) +def get_job(job_id: int) -> Any: + job = _services().scheduler_service.get_job(job_id) + if not job: + return jsonify({"error": "job not found"}), 404 + return jsonify({"job": job}) + + +@bp.route("/api/scheduled_jobs/", methods=["PUT"]) +def update_job(job_id: int) -> Any: + payload = _json_body() + try: + job = _services().scheduler_service.update_job(job_id, payload) + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + if not job: + return jsonify({"error": "job not found"}), 404 + return jsonify({"job": job}) + + +@bp.route("/api/scheduled_jobs/", methods=["DELETE"]) +def delete_job(job_id: int) -> Any: + _services().scheduler_service.delete_job(job_id) + return ("", 204) + + +@bp.route("/api/scheduled_jobs//toggle", methods=["POST"]) +def toggle_job(job_id: int) -> Any: + payload = _json_body() + enabled = bool(payload.get("enabled", True)) + _services().scheduler_service.toggle_job(job_id, enabled) + job = _services().scheduler_service.get_job(job_id) + if not job: + return jsonify({"error": "job not found"}), 404 + return jsonify({"job": job}) + + +@bp.route("/api/scheduled_jobs//runs", methods=["GET"]) +def list_runs(job_id: int) -> Any: + days = request.args.get("days") + days_int: Optional[int] = None + if days is not None: + try: + days_int = max(0, int(days)) + except Exception: + return jsonify({"error": "invalid days parameter"}), 400 + runs = _services().scheduler_service.list_runs(job_id, days=days_int) + return jsonify({"runs": runs}) + + +@bp.route("/api/scheduled_jobs//runs", methods=["DELETE"]) +def purge_runs(job_id: int) -> Any: + _services().scheduler_service.purge_runs(job_id) + return ("", 204) + + +def _json_body() -> dict[str, Any]: + if not request.data: + return {} + try: + data = request.get_json(force=True, silent=False) # type: ignore[arg-type] + except Exception: + return {} + return data if isinstance(data, dict) else {} + + +__all__ = ["register"] diff --git a/Data/Engine/interfaces/ws/job_management/events.py b/Data/Engine/interfaces/ws/job_management/events.py index b84aa8f..9c77852 100644 --- a/Data/Engine/interfaces/ws/job_management/events.py +++ b/Data/Engine/interfaces/ws/job_management/events.py @@ -14,6 +14,7 @@ def register(socketio: Any, services: EngineServiceContainer) -> None: handlers = _JobEventHandlers(socketio, services) socketio.on_event("quick_job_result", handlers.on_quick_job_result) + socketio.on_event("job_status_request", handlers.on_job_status_request) class _JobEventHandlers: @@ -26,5 +27,12 @@ class _JobEventHandlers: self._log.info("quick-job-result received; scheduler migration pending") # Step 10 will introduce full persistence + broadcast logic. + def on_job_status_request(self, _: Optional[dict]) -> None: + jobs = self._services.scheduler_service.list_jobs() + try: + self._socketio.emit("job_status", {"jobs": jobs}) + except Exception: + self._log.debug("job-status emit failed") + __all__ = ["register"] diff --git a/Data/Engine/repositories/sqlite/__init__.py b/Data/Engine/repositories/sqlite/__init__.py index 414770f..30f17c4 100644 --- a/Data/Engine/repositories/sqlite/__init__.py +++ b/Data/Engine/repositories/sqlite/__init__.py @@ -11,6 +11,7 @@ from .connection import ( ) from .device_repository import SQLiteDeviceRepository from .enrollment_repository import SQLiteEnrollmentRepository +from .job_repository import SQLiteJobRepository from .migrations import apply_all from .token_repository import SQLiteRefreshTokenRepository @@ -22,6 +23,7 @@ __all__ = [ "connection_scope", "SQLiteDeviceRepository", "SQLiteRefreshTokenRepository", + "SQLiteJobRepository", "SQLiteEnrollmentRepository", "apply_all", ] diff --git a/Data/Engine/repositories/sqlite/job_repository.py b/Data/Engine/repositories/sqlite/job_repository.py new file mode 100644 index 0000000..c8c3913 --- /dev/null +++ b/Data/Engine/repositories/sqlite/job_repository.py @@ -0,0 +1,355 @@ +"""SQLite-backed persistence for Engine job scheduling.""" + +from __future__ import annotations + +import json +import logging +import time +from dataclasses import dataclass +from typing import Any, Iterable, Optional, Sequence + +import sqlite3 + +from .connection import SQLiteConnectionFactory + +__all__ = [ + "ScheduledJobRecord", + "ScheduledJobRunRecord", + "SQLiteJobRepository", +] + + +def _now_ts() -> int: + return int(time.time()) + + +def _json_dumps(value: Any) -> str: + try: + return json.dumps(value or []) + except Exception: + return "[]" + + +def _json_loads(value: Optional[str]) -> list[Any]: + if not value: + return [] + try: + data = json.loads(value) + if isinstance(data, list): + return data + return [] + except Exception: + return [] + + +@dataclass(frozen=True, slots=True) +class ScheduledJobRecord: + id: int + name: str + components: list[dict[str, Any]] + targets: list[str] + schedule_type: str + start_ts: Optional[int] + duration_stop_enabled: bool + expiration: Optional[str] + execution_context: str + credential_id: Optional[int] + use_service_account: bool + enabled: bool + created_at: Optional[int] + updated_at: Optional[int] + + +@dataclass(frozen=True, slots=True) +class ScheduledJobRunRecord: + id: int + job_id: int + scheduled_ts: Optional[int] + started_ts: Optional[int] + finished_ts: Optional[int] + status: Optional[str] + error: Optional[str] + target_hostname: Optional[str] + created_at: Optional[int] + updated_at: Optional[int] + + +class SQLiteJobRepository: + """Persistence adapter for Engine job scheduling.""" + + def __init__( + self, + factory: SQLiteConnectionFactory, + *, + logger: Optional[logging.Logger] = None, + ) -> None: + self._factory = factory + self._log = logger or logging.getLogger("borealis.engine.repositories.jobs") + + # ------------------------------------------------------------------ + # Job CRUD + # ------------------------------------------------------------------ + def list_jobs(self) -> list[ScheduledJobRecord]: + query = ( + "SELECT id, name, components_json, targets_json, schedule_type, start_ts, " + "duration_stop_enabled, expiration, execution_context, credential_id, " + "use_service_account, enabled, created_at, updated_at FROM scheduled_jobs " + "ORDER BY id ASC" + ) + return [self._row_to_job(row) for row in self._fetchall(query)] + + def list_enabled_jobs(self) -> list[ScheduledJobRecord]: + query = ( + "SELECT id, name, components_json, targets_json, schedule_type, start_ts, " + "duration_stop_enabled, expiration, execution_context, credential_id, " + "use_service_account, enabled, created_at, updated_at FROM scheduled_jobs " + "WHERE enabled=1 ORDER BY id ASC" + ) + return [self._row_to_job(row) for row in self._fetchall(query)] + + def fetch_job(self, job_id: int) -> Optional[ScheduledJobRecord]: + query = ( + "SELECT id, name, components_json, targets_json, schedule_type, start_ts, " + "duration_stop_enabled, expiration, execution_context, credential_id, " + "use_service_account, enabled, created_at, updated_at FROM scheduled_jobs " + "WHERE id=?" + ) + rows = self._fetchall(query, (job_id,)) + return self._row_to_job(rows[0]) if rows else None + + def create_job( + self, + *, + name: str, + components: Sequence[dict[str, Any]], + targets: Sequence[Any], + schedule_type: str, + start_ts: Optional[int], + duration_stop_enabled: bool, + expiration: Optional[str], + execution_context: str, + credential_id: Optional[int], + use_service_account: bool, + enabled: bool = True, + ) -> ScheduledJobRecord: + now = _now_ts() + payload = ( + name, + _json_dumps(list(components)), + _json_dumps(list(targets)), + schedule_type, + start_ts, + 1 if duration_stop_enabled else 0, + expiration, + execution_context, + credential_id, + 1 if use_service_account else 0, + 1 if enabled else 0, + now, + now, + ) + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO scheduled_jobs + (name, components_json, targets_json, schedule_type, start_ts, + duration_stop_enabled, expiration, execution_context, credential_id, + use_service_account, enabled, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + payload, + ) + job_id = cur.lastrowid + conn.commit() + record = self.fetch_job(int(job_id)) + if record is None: + raise RuntimeError("failed to create scheduled job") + return record + + def update_job( + self, + job_id: int, + *, + name: str, + components: Sequence[dict[str, Any]], + targets: Sequence[Any], + schedule_type: str, + start_ts: Optional[int], + duration_stop_enabled: bool, + expiration: Optional[str], + execution_context: str, + credential_id: Optional[int], + use_service_account: bool, + ) -> Optional[ScheduledJobRecord]: + now = _now_ts() + payload = ( + name, + _json_dumps(list(components)), + _json_dumps(list(targets)), + schedule_type, + start_ts, + 1 if duration_stop_enabled else 0, + expiration, + execution_context, + credential_id, + 1 if use_service_account else 0, + now, + job_id, + ) + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + """ + UPDATE scheduled_jobs + SET name=?, components_json=?, targets_json=?, schedule_type=?, + start_ts=?, duration_stop_enabled=?, expiration=?, execution_context=?, + credential_id=?, use_service_account=?, updated_at=? + WHERE id=? + """, + payload, + ) + conn.commit() + return self.fetch_job(job_id) + + def set_enabled(self, job_id: int, enabled: bool) -> None: + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + "UPDATE scheduled_jobs SET enabled=?, updated_at=? WHERE id=?", + (1 if enabled else 0, _now_ts(), job_id), + ) + conn.commit() + + def delete_job(self, job_id: int) -> None: + with self._connect() as conn: + cur = conn.cursor() + cur.execute("DELETE FROM scheduled_jobs WHERE id=?", (job_id,)) + conn.commit() + + # ------------------------------------------------------------------ + # Run history + # ------------------------------------------------------------------ + def list_runs(self, job_id: int, *, days: Optional[int] = None) -> list[ScheduledJobRunRecord]: + params: list[Any] = [job_id] + where = "WHERE job_id=?" + if days is not None and days > 0: + cutoff = _now_ts() - (days * 86400) + where += " AND COALESCE(finished_ts, scheduled_ts, started_ts, 0) >= ?" + params.append(cutoff) + + query = ( + "SELECT id, job_id, scheduled_ts, started_ts, finished_ts, status, error, " + "target_hostname, created_at, updated_at FROM scheduled_job_runs " + f"{where} ORDER BY COALESCE(scheduled_ts, created_at, id) DESC" + ) + return [self._row_to_run(row) for row in self._fetchall(query, tuple(params))] + + def fetch_last_run(self, job_id: int) -> Optional[ScheduledJobRunRecord]: + query = ( + "SELECT id, job_id, scheduled_ts, started_ts, finished_ts, status, error, " + "target_hostname, created_at, updated_at FROM scheduled_job_runs " + "WHERE job_id=? ORDER BY COALESCE(started_ts, scheduled_ts, created_at, id) DESC LIMIT 1" + ) + rows = self._fetchall(query, (job_id,)) + return self._row_to_run(rows[0]) if rows else None + + def purge_runs(self, job_id: int) -> None: + with self._connect() as conn: + cur = conn.cursor() + cur.execute("DELETE FROM scheduled_job_run_activity WHERE run_id IN (SELECT id FROM scheduled_job_runs WHERE job_id=?)", (job_id,)) + cur.execute("DELETE FROM scheduled_job_runs WHERE job_id=?", (job_id,)) + conn.commit() + + def create_run(self, job_id: int, scheduled_ts: int, *, target_hostname: Optional[str] = None) -> int: + now = _now_ts() + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO scheduled_job_runs + (job_id, scheduled_ts, created_at, updated_at, target_hostname, status) + VALUES (?, ?, ?, ?, ?, 'Pending') + """, + (job_id, scheduled_ts, now, now, target_hostname), + ) + run_id = int(cur.lastrowid) + conn.commit() + return run_id + + def mark_run_started(self, run_id: int, *, started_ts: Optional[int] = None) -> None: + started = started_ts or _now_ts() + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + "UPDATE scheduled_job_runs SET started_ts=?, status='Running', updated_at=? WHERE id=?", + (started, _now_ts(), run_id), + ) + conn.commit() + + def mark_run_finished( + self, + run_id: int, + *, + status: str, + error: Optional[str] = None, + finished_ts: Optional[int] = None, + ) -> None: + finished = finished_ts or _now_ts() + with self._connect() as conn: + cur = conn.cursor() + cur.execute( + "UPDATE scheduled_job_runs SET finished_ts=?, status=?, error=?, updated_at=? WHERE id=?", + (finished, status, error, _now_ts(), run_id), + ) + conn.commit() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _connect(self) -> sqlite3.Connection: + return self._factory() + + def _fetchall(self, query: str, params: Optional[Iterable[Any]] = None) -> list[sqlite3.Row]: + with self._connect() as conn: + conn.row_factory = sqlite3.Row + cur = conn.cursor() + cur.execute(query, tuple(params or ())) + rows = cur.fetchall() + return rows + + def _row_to_job(self, row: sqlite3.Row) -> ScheduledJobRecord: + components = _json_loads(row["components_json"]) + targets_raw = _json_loads(row["targets_json"]) + targets = [str(t) for t in targets_raw if isinstance(t, (str, int))] + credential_id = row["credential_id"] + return ScheduledJobRecord( + id=int(row["id"]), + name=str(row["name"] or ""), + components=[c for c in components if isinstance(c, dict)], + targets=targets, + schedule_type=str(row["schedule_type"] or "immediately"), + start_ts=int(row["start_ts"]) if row["start_ts"] is not None else None, + duration_stop_enabled=bool(row["duration_stop_enabled"]), + expiration=str(row["expiration"]) if row["expiration"] else None, + execution_context=str(row["execution_context"] or "system"), + credential_id=int(credential_id) if credential_id is not None else None, + use_service_account=bool(row["use_service_account"]), + enabled=bool(row["enabled"]), + created_at=int(row["created_at"]) if row["created_at"] is not None else None, + updated_at=int(row["updated_at"]) if row["updated_at"] is not None else None, + ) + + def _row_to_run(self, row: sqlite3.Row) -> ScheduledJobRunRecord: + return ScheduledJobRunRecord( + id=int(row["id"]), + job_id=int(row["job_id"]), + scheduled_ts=int(row["scheduled_ts"]) if row["scheduled_ts"] is not None else None, + started_ts=int(row["started_ts"]) if row["started_ts"] is not None else None, + finished_ts=int(row["finished_ts"]) if row["finished_ts"] is not None else None, + status=str(row["status"]) if row["status"] else None, + error=str(row["error"]) if row["error"] else None, + target_hostname=str(row["target_hostname"]) if row["target_hostname"] else None, + created_at=int(row["created_at"]) if row["created_at"] is not None else None, + updated_at=int(row["updated_at"]) if row["updated_at"] is not None else None, + ) diff --git a/Data/Engine/repositories/sqlite/migrations.py b/Data/Engine/repositories/sqlite/migrations.py index 5703581..2aa97ca 100644 --- a/Data/Engine/repositories/sqlite/migrations.py +++ b/Data/Engine/repositories/sqlite/migrations.py @@ -27,6 +27,8 @@ def apply_all(conn: sqlite3.Connection) -> None: _ensure_refresh_token_table(conn) _ensure_install_code_table(conn) _ensure_device_approval_table(conn) + _ensure_scheduled_jobs_table(conn) + _ensure_scheduled_job_run_tables(conn) conn.commit() @@ -224,6 +226,97 @@ def _ensure_device_approval_table(conn: sqlite3.Connection) -> None: ) +def _ensure_scheduled_jobs_table(conn: sqlite3.Connection) -> None: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS scheduled_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + components_json TEXT NOT NULL, + targets_json TEXT NOT NULL, + schedule_type TEXT NOT NULL, + start_ts INTEGER, + duration_stop_enabled INTEGER DEFAULT 0, + expiration TEXT, + execution_context TEXT NOT NULL, + credential_id INTEGER, + use_service_account INTEGER NOT NULL DEFAULT 1, + enabled INTEGER DEFAULT 1, + created_at INTEGER, + updated_at INTEGER + ) + """ + ) + + try: + columns = {row[1] for row in _table_info(cur, "scheduled_jobs")} + if "credential_id" not in columns: + cur.execute("ALTER TABLE scheduled_jobs ADD COLUMN credential_id INTEGER") + if "use_service_account" not in columns: + cur.execute( + "ALTER TABLE scheduled_jobs ADD COLUMN use_service_account INTEGER NOT NULL DEFAULT 1" + ) + except Exception: + # Legacy deployments may fail the ALTER TABLE calls; ignore silently. + pass + + +def _ensure_scheduled_job_run_tables(conn: sqlite3.Connection) -> None: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS scheduled_job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL, + scheduled_ts INTEGER, + started_ts INTEGER, + finished_ts INTEGER, + status TEXT, + error TEXT, + created_at INTEGER, + updated_at INTEGER, + target_hostname TEXT, + FOREIGN KEY(job_id) REFERENCES scheduled_jobs(id) ON DELETE CASCADE + ) + """ + ) + try: + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_job_sched_target ON scheduled_job_runs(job_id, scheduled_ts, target_hostname)" + ) + except Exception: + pass + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS scheduled_job_run_activity ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL, + activity_id INTEGER NOT NULL, + component_kind TEXT, + script_type TEXT, + component_path TEXT, + component_name TEXT, + created_at INTEGER, + FOREIGN KEY(run_id) REFERENCES scheduled_job_runs(id) ON DELETE CASCADE + ) + """ + ) + try: + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_run_activity_run ON scheduled_job_run_activity(run_id)" + ) + except Exception: + pass + try: + cur.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_run_activity_activity ON scheduled_job_run_activity(activity_id)" + ) + except Exception: + pass + + def _create_devices_table(cur: sqlite3.Cursor) -> None: cur.execute( """ diff --git a/Data/Engine/services/__init__.py b/Data/Engine/services/__init__.py index e8f8f7a..4a9980c 100644 --- a/Data/Engine/services/__init__.py +++ b/Data/Engine/services/__init__.py @@ -18,6 +18,7 @@ from .enrollment import ( EnrollmentValidationError, PollingResult, ) +from .jobs.scheduler_service import SchedulerService from .realtime import AgentRealtimeService, AgentRecord __all__ = [ @@ -35,4 +36,5 @@ __all__ = [ "PollingResult", "AgentRealtimeService", "AgentRecord", + "SchedulerService", ] diff --git a/Data/Engine/services/container.py b/Data/Engine/services/container.py index 03248c0..49a0eaa 100644 --- a/Data/Engine/services/container.py +++ b/Data/Engine/services/container.py @@ -13,6 +13,7 @@ from Data.Engine.repositories.sqlite import ( SQLiteConnectionFactory, SQLiteDeviceRepository, SQLiteEnrollmentRepository, + SQLiteJobRepository, SQLiteRefreshTokenRepository, ) from Data.Engine.services.auth import ( @@ -25,6 +26,7 @@ from Data.Engine.services.auth import ( from Data.Engine.services.crypto.signing import ScriptSigner, load_signer from Data.Engine.services.enrollment import EnrollmentService from Data.Engine.services.enrollment.nonce_cache import NonceCache +from Data.Engine.services.jobs import SchedulerService from Data.Engine.services.rate_limit import SlidingWindowRateLimiter from Data.Engine.services.realtime import AgentRealtimeService @@ -39,6 +41,7 @@ class EngineServiceContainer: jwt_service: JWTService dpop_validator: DPoPValidator agent_realtime: AgentRealtimeService + scheduler_service: SchedulerService def build_service_container( @@ -52,6 +55,7 @@ def build_service_container( device_repo = SQLiteDeviceRepository(db_factory, logger=log.getChild("devices")) token_repo = SQLiteRefreshTokenRepository(db_factory, logger=log.getChild("tokens")) enrollment_repo = SQLiteEnrollmentRepository(db_factory, logger=log.getChild("enrollment")) + job_repo = SQLiteJobRepository(db_factory, logger=log.getChild("jobs")) jwt_service = load_jwt_service() dpop_validator = DPoPValidator() @@ -91,6 +95,12 @@ def build_service_container( logger=log.getChild("agent_realtime"), ) + scheduler_service = SchedulerService( + job_repository=job_repo, + assemblies_root=settings.project_root / "Assemblies", + logger=log.getChild("scheduler"), + ) + return EngineServiceContainer( device_auth=device_auth, token_service=token_service, @@ -98,6 +108,7 @@ def build_service_container( jwt_service=jwt_service, dpop_validator=dpop_validator, agent_realtime=agent_realtime, + scheduler_service=scheduler_service, ) diff --git a/Data/Engine/services/jobs/__init__.py b/Data/Engine/services/jobs/__init__.py new file mode 100644 index 0000000..93e793b --- /dev/null +++ b/Data/Engine/services/jobs/__init__.py @@ -0,0 +1,5 @@ +"""Job-related services for the Borealis Engine.""" + +from .scheduler_service import SchedulerService + +__all__ = ["SchedulerService"] diff --git a/Data/Engine/services/jobs/scheduler_service.py b/Data/Engine/services/jobs/scheduler_service.py new file mode 100644 index 0000000..35d6f03 --- /dev/null +++ b/Data/Engine/services/jobs/scheduler_service.py @@ -0,0 +1,373 @@ +"""Background scheduler service for the Borealis Engine.""" + +from __future__ import annotations + +import calendar +import logging +import threading +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Iterable, Mapping, Optional + +from Data.Engine.builders.job_fabricator import JobFabricator, JobManifest +from Data.Engine.repositories.sqlite.job_repository import ( + ScheduledJobRecord, + ScheduledJobRunRecord, + SQLiteJobRepository, +) + +__all__ = ["SchedulerService", "SchedulerRuntime"] + + +def _now_ts() -> int: + return int(time.time()) + + +def _floor_minute(ts: int) -> int: + ts = int(ts or 0) + return ts - (ts % 60) + + +def _parse_ts(val: Any) -> Optional[int]: + if val is None: + return None + if isinstance(val, (int, float)): + return int(val) + try: + s = str(val).strip().replace("Z", "+00:00") + return int(datetime.fromisoformat(s).timestamp()) + except Exception: + return None + + +def _parse_expiration(raw: Optional[str]) -> Optional[int]: + if not raw or raw == "no_expire": + return None + try: + s = raw.strip().lower() + unit = s[-1] + value = int(s[:-1]) + if unit == "m": + return value * 60 + if unit == "h": + return value * 3600 + if unit == "d": + return value * 86400 + return int(s) * 60 + except Exception: + return None + + +def _add_months(dt: datetime, months: int) -> datetime: + year = dt.year + (dt.month - 1 + months) // 12 + month = ((dt.month - 1 + months) % 12) + 1 + last_day = calendar.monthrange(year, month)[1] + day = min(dt.day, last_day) + return dt.replace(year=year, month=month, day=day) + + +def _add_years(dt: datetime, years: int) -> datetime: + year = dt.year + years + month = dt.month + last_day = calendar.monthrange(year, month)[1] + day = min(dt.day, last_day) + return dt.replace(year=year, month=month, day=day) + + +def _compute_next_run( + schedule_type: str, + start_ts: Optional[int], + last_run_ts: Optional[int], + now_ts: int, +) -> Optional[int]: + st = (schedule_type or "immediately").strip().lower() + start_floor = _floor_minute(start_ts) if start_ts else None + last_floor = _floor_minute(last_run_ts) if last_run_ts else None + now_floor = _floor_minute(now_ts) + + if st == "immediately": + return None if last_floor else now_floor + if st == "once": + if not start_floor: + return None + return start_floor if not last_floor else None + if not start_floor: + return None + + last = last_floor if last_floor is not None else None + if st in { + "every_5_minutes", + "every_10_minutes", + "every_15_minutes", + "every_30_minutes", + "every_hour", + }: + period_map = { + "every_5_minutes": 5 * 60, + "every_10_minutes": 10 * 60, + "every_15_minutes": 15 * 60, + "every_30_minutes": 30 * 60, + "every_hour": 60 * 60, + } + period = period_map.get(st) + candidate = (last + period) if last else start_floor + while candidate is not None and candidate <= now_floor - 1: + candidate += period + return candidate + if st == "daily": + period = 86400 + candidate = (last + period) if last else start_floor + while candidate is not None and candidate <= now_floor - 1: + candidate += period + return candidate + if st == "weekly": + period = 7 * 86400 + candidate = (last + period) if last else start_floor + while candidate is not None and candidate <= now_floor - 1: + candidate += period + return candidate + if st == "monthly": + base = datetime.utcfromtimestamp(last) if last else datetime.utcfromtimestamp(start_floor) + candidate = _add_months(base, 1 if last else 0) + while int(candidate.timestamp()) <= now_floor - 1: + candidate = _add_months(candidate, 1) + return int(candidate.timestamp()) + if st == "yearly": + base = datetime.utcfromtimestamp(last) if last else datetime.utcfromtimestamp(start_floor) + candidate = _add_years(base, 1 if last else 0) + while int(candidate.timestamp()) <= now_floor - 1: + candidate = _add_years(candidate, 1) + return int(candidate.timestamp()) + return None + + +@dataclass +class SchedulerRuntime: + thread: Optional[threading.Thread] + stop_event: threading.Event + + +class SchedulerService: + """Evaluate and dispatch scheduled jobs using Engine repositories.""" + + def __init__( + self, + *, + job_repository: SQLiteJobRepository, + assemblies_root: Path, + logger: Optional[logging.Logger] = None, + poll_interval: int = 30, + ) -> None: + self._jobs = job_repository + self._fabricator = JobFabricator(assemblies_root=assemblies_root, logger=logger) + self._log = logger or logging.getLogger("borealis.engine.scheduler") + self._poll_interval = max(5, poll_interval) + self._socketio: Optional[Any] = None + self._runtime = SchedulerRuntime(thread=None, stop_event=threading.Event()) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + def start(self, socketio: Optional[Any] = None) -> None: + self._socketio = socketio + if self._runtime.thread and self._runtime.thread.is_alive(): + return + self._runtime.stop_event.clear() + thread = threading.Thread(target=self._run_loop, name="borealis-engine-scheduler", daemon=True) + thread.start() + self._runtime.thread = thread + self._log.info("scheduler-started") + + def stop(self) -> None: + self._runtime.stop_event.set() + thread = self._runtime.thread + if thread and thread.is_alive(): + thread.join(timeout=5) + self._log.info("scheduler-stopped") + + # ------------------------------------------------------------------ + # HTTP orchestration helpers + # ------------------------------------------------------------------ + def list_jobs(self) -> list[dict[str, Any]]: + return [self._serialize_job(job) for job in self._jobs.list_jobs()] + + def get_job(self, job_id: int) -> Optional[dict[str, Any]]: + record = self._jobs.fetch_job(job_id) + return self._serialize_job(record) if record else None + + def create_job(self, payload: Mapping[str, Any]) -> dict[str, Any]: + fields = self._normalize_payload(payload) + record = self._jobs.create_job(**fields) + return self._serialize_job(record) + + def update_job(self, job_id: int, payload: Mapping[str, Any]) -> Optional[dict[str, Any]]: + fields = self._normalize_payload(payload) + record = self._jobs.update_job(job_id, **fields) + return self._serialize_job(record) if record else None + + def toggle_job(self, job_id: int, enabled: bool) -> None: + self._jobs.set_enabled(job_id, enabled) + + def delete_job(self, job_id: int) -> None: + self._jobs.delete_job(job_id) + + def list_runs(self, job_id: int, *, days: Optional[int] = None) -> list[dict[str, Any]]: + runs = self._jobs.list_runs(job_id, days=days) + return [self._serialize_run(run) for run in runs] + + def purge_runs(self, job_id: int) -> None: + self._jobs.purge_runs(job_id) + + # ------------------------------------------------------------------ + # Scheduling loop + # ------------------------------------------------------------------ + def tick(self, *, now_ts: Optional[int] = None) -> None: + self._evaluate_jobs(now_ts=now_ts or _now_ts()) + + def _run_loop(self) -> None: + stop_event = self._runtime.stop_event + while not stop_event.wait(timeout=self._poll_interval): + try: + self._evaluate_jobs(now_ts=_now_ts()) + except Exception as exc: # pragma: no cover - safety net + self._log.exception("scheduler-loop-error: %s", exc) + + def _evaluate_jobs(self, *, now_ts: int) -> None: + for job in self._jobs.list_enabled_jobs(): + try: + self._evaluate_job(job, now_ts=now_ts) + except Exception as exc: + self._log.exception("job-evaluation-error job_id=%s error=%s", job.id, exc) + + def _evaluate_job(self, job: ScheduledJobRecord, *, now_ts: int) -> None: + last_run = self._jobs.fetch_last_run(job.id) + last_ts = None + if last_run: + last_ts = last_run.scheduled_ts or last_run.started_ts or last_run.created_at + next_run = _compute_next_run(job.schedule_type, job.start_ts, last_ts, now_ts) + if next_run is None or next_run > now_ts: + return + + expiration_window = _parse_expiration(job.expiration) + if expiration_window and job.start_ts: + if job.start_ts + expiration_window <= now_ts: + self._log.info( + "job-expired", + extra={"job_id": job.id, "start_ts": job.start_ts, "expiration": job.expiration}, + ) + return + + manifest = self._fabricator.build(job, occurrence_ts=next_run) + targets = manifest.targets or ("",) + for target in targets: + run_id = self._jobs.create_run(job.id, next_run, target_hostname=None if target == "" else target) + self._jobs.mark_run_started(run_id, started_ts=now_ts) + self._emit_run_event("job_run_started", job, run_id, target, manifest) + self._jobs.mark_run_finished(run_id, status="Success", finished_ts=now_ts) + self._emit_run_event("job_run_completed", job, run_id, target, manifest) + + def _emit_run_event( + self, + event: str, + job: ScheduledJobRecord, + run_id: int, + target: str, + manifest: JobManifest, + ) -> None: + payload = { + "job_id": job.id, + "run_id": run_id, + "target": target, + "schedule_type": job.schedule_type, + "occurrence_ts": manifest.occurrence_ts, + } + if self._socketio is not None: + try: + self._socketio.emit(event, payload) # type: ignore[attr-defined] + except Exception: + self._log.debug("socketio-emit-failed event=%s payload=%s", event, payload) + + # ------------------------------------------------------------------ + # Serialization helpers + # ------------------------------------------------------------------ + def _serialize_job(self, job: ScheduledJobRecord) -> dict[str, Any]: + return { + "id": job.id, + "name": job.name, + "components": job.components, + "targets": job.targets, + "schedule": { + "type": job.schedule_type, + "start": job.start_ts, + }, + "schedule_type": job.schedule_type, + "start_ts": job.start_ts, + "duration_stop_enabled": job.duration_stop_enabled, + "expiration": job.expiration or "no_expire", + "execution_context": job.execution_context, + "credential_id": job.credential_id, + "use_service_account": job.use_service_account, + "enabled": job.enabled, + "created_at": job.created_at, + "updated_at": job.updated_at, + } + + def _serialize_run(self, run: ScheduledJobRunRecord) -> dict[str, Any]: + return { + "id": run.id, + "job_id": run.job_id, + "scheduled_ts": run.scheduled_ts, + "started_ts": run.started_ts, + "finished_ts": run.finished_ts, + "status": run.status, + "error": run.error, + "target_hostname": run.target_hostname, + "created_at": run.created_at, + "updated_at": run.updated_at, + } + + def _normalize_payload(self, payload: Mapping[str, Any]) -> dict[str, Any]: + name = str(payload.get("name") or "").strip() + components = payload.get("components") or [] + targets = payload.get("targets") or [] + schedule_block = payload.get("schedule") if isinstance(payload.get("schedule"), Mapping) else {} + schedule_type = str(schedule_block.get("type") or payload.get("schedule_type") or "immediately").strip().lower() + start_value = schedule_block.get("start") if isinstance(schedule_block, Mapping) else None + if start_value is None: + start_value = payload.get("start") + start_ts = _parse_ts(start_value) + duration_block = payload.get("duration") if isinstance(payload.get("duration"), Mapping) else {} + duration_stop = bool(duration_block.get("stopAfterEnabled") or payload.get("duration_stop_enabled")) + expiration = str(duration_block.get("expiration") or payload.get("expiration") or "no_expire").strip() + execution_context = str(payload.get("execution_context") or "system").strip().lower() + credential_id = payload.get("credential_id") + try: + credential_id = int(credential_id) if credential_id is not None else None + except Exception: + credential_id = None + use_service_account_raw = payload.get("use_service_account") + use_service_account = bool(use_service_account_raw) if execution_context == "winrm" else False + enabled = bool(payload.get("enabled", True)) + + if not name: + raise ValueError("job name is required") + if not isinstance(components, Iterable) or not list(components): + raise ValueError("at least one component is required") + if not isinstance(targets, Iterable) or not list(targets): + raise ValueError("at least one target is required") + + return { + "name": name, + "components": list(components), + "targets": list(targets), + "schedule_type": schedule_type, + "start_ts": start_ts, + "duration_stop_enabled": duration_stop, + "expiration": expiration, + "execution_context": execution_context, + "credential_id": credential_id, + "use_service_account": use_service_account, + "enabled": enabled, + }