diff --git a/Borealis.ps1 b/Borealis.ps1 index b4df5d6f..127901f6 100644 --- a/Borealis.ps1 +++ b/Borealis.ps1 @@ -233,31 +233,31 @@ function Ensure-EngineWebInterface { [string]$ProjectRoot ) - $engineSource = Join-Path $ProjectRoot 'Engine\web-interface' - $legacySource = Join-Path $ProjectRoot 'Data\Server\WebUI' + $engineDestination = Join-Path $ProjectRoot 'Engine\web-interface' + $engineStageSource = Join-Path $ProjectRoot 'Data\Engine\web-interface' - if (-not (Test-Path $legacySource)) { - return + if (-not (Test-Path $engineStageSource)) { + throw "Engine web interface source missing at '$engineStageSource'." } - $enginePackage = Join-Path $engineSource 'package.json' - $engineSrcDir = Join-Path $engineSource 'src' + $enginePackage = Join-Path $engineDestination 'package.json' + $engineSrcDir = Join-Path $engineDestination 'src' if ((Test-Path $enginePackage) -and (Test-Path $engineSrcDir)) { return } - if (-not (Test-Path $engineSource)) { - New-Item -Path $engineSource -ItemType Directory -Force | Out-Null + if (-not (Test-Path $engineDestination)) { + New-Item -Path $engineDestination -ItemType Directory -Force | Out-Null } $preserve = @('.gitignore','README.md') - Get-ChildItem -Path $engineSource -Force | Where-Object { $preserve -notcontains $_.Name } | + Get-ChildItem -Path $engineDestination -Force | Where-Object { $preserve -notcontains $_.Name } | Remove-Item -Recurse -Force -ErrorAction SilentlyContinue - Copy-Item (Join-Path $legacySource '*') $engineSource -Recurse -Force + Copy-Item (Join-Path $engineStageSource '*') $engineDestination -Recurse -Force if (-not (Test-Path $enginePackage)) { - throw "Failed to stage Engine web interface into '$engineSource'." + throw "Failed to stage Engine web interface into '$engineDestination' from '$engineStageSource'." } } @@ -1415,7 +1415,7 @@ switch ($choice) { $dataSource = "Data" $engineSource = "$dataSource\Engine" $engineDataDestination = "$venvFolder\Data\Engine" - $webUIFallbackSource = "$dataSource\Server\web-interface" + $webUIFallbackSource = "$dataSource\Engine\web-interface" $webUIDestination = "$venvFolder\web-interface" $venvPython = Join-Path $venvFolder 'Scripts\python.exe' $engineSourceAbsolute = Join-Path $scriptDir $engineSource diff --git a/Borealis.sh b/Borealis.sh index 2b5c24b8..3b72154c 100644 --- a/Borealis.sh +++ b/Borealis.sh @@ -332,20 +332,20 @@ PY ensure_engine_webui_source() { local engineSource="Engine/web-interface" - local legacySource="Data/Server/WebUI" if [[ -d "${engineSource}/src" && -f "${engineSource}/package.json" ]]; then return 0 fi - if [[ ! -d "$legacySource" ]]; then - echo "${RED}Legacy WebUI source '$legacySource' not found.${RESET}" >&2 + local stageSource="Data/Engine/web-interface" + if [[ ! -d "$stageSource" ]]; then + echo "${RED}Engine web interface source '$stageSource' not found.${RESET}" >&2 return 1 fi mkdir -p "$engineSource" find "$engineSource" -mindepth 1 -maxdepth 1 \ ! -name '.gitignore' ! -name 'README.md' -exec rm -rf {} + - cp -a "$legacySource/." "$engineSource/" + cp -a "$stageSource/." "$engineSource/" if [[ ! -f "${engineSource}/package.json" ]]; then - echo "${RED}Failed to stage Engine web interface into '$engineSource'.${RESET}" >&2 + echo "${RED}Failed to stage Engine web interface into '$engineSource' from '$stageSource'.${RESET}" >&2 return 1 fi } diff --git a/Data/Engine/Assemblies/Scripts/Borealis/Migrate_Agent_to_Different_Borealis_Server.json b/Data/Engine/Assemblies/Scripts/Borealis/Migrate_Agent_to_Different_Borealis_Server.json index 97dd2752..c0691518 100644 --- a/Data/Engine/Assemblies/Scripts/Borealis/Migrate_Agent_to_Different_Borealis_Server.json +++ b/Data/Engine/Assemblies/Scripts/Borealis/Migrate_Agent_to_Different_Borealis_Server.json @@ -15,7 +15,7 @@ "name": "server_url", "label": "Borealis Server URL", "type": "string", - "default": "http://localhost:5000", + "default": "https://localhost:5000", "required": true, "description": "URL of where the agent is going to reach-out to moving forward." } diff --git a/Data/Engine/bootstrapper.py b/Data/Engine/bootstrapper.py index 3a5e9248..5761b305 100644 --- a/Data/Engine/bootstrapper.py +++ b/Data/Engine/bootstrapper.py @@ -77,11 +77,11 @@ def _stage_web_interface_assets(logger: Optional[logging.Logger] = None, *, forc project_root = _project_root() engine_web_root = project_root / "Engine" / "web-interface" - legacy_source = project_root / "Data" / "Server" / "WebUI" + stage_source = project_root / "Data" / "Engine" / "web-interface" - if not legacy_source.is_dir(): + if not stage_source.is_dir(): raise RuntimeError( - f"Engine web interface source missing: {legacy_source}" + f"Engine web interface source missing: {stage_source}" ) index_path = engine_web_root / "index.html" @@ -92,14 +92,14 @@ def _stage_web_interface_assets(logger: Optional[logging.Logger] = None, *, forc if engine_web_root.exists(): shutil.rmtree(engine_web_root) - shutil.copytree(legacy_source, engine_web_root) + shutil.copytree(stage_source, engine_web_root) if not index_path.is_file(): raise RuntimeError( f"Engine web interface staging failed; missing {index_path}" ) - logger.info("Engine web interface staged from %s to %s", legacy_source, engine_web_root) + logger.info("Engine web interface staged from %s to %s", stage_source, engine_web_root) return engine_web_root diff --git a/Data/Engine/services/API/scheduled_jobs/legacy_job_scheduler.py b/Data/Engine/services/API/scheduled_jobs/legacy_job_scheduler.py new file mode 100644 index 00000000..91960102 --- /dev/null +++ b/Data/Engine/services/API/scheduled_jobs/legacy_job_scheduler.py @@ -0,0 +1,1838 @@ +import os +import time +import json +import os +import base64 +import re +import sqlite3 +from typing import Any, Dict, List, Optional, Tuple, Callable + +_WINRM_USERNAME_VAR = "__borealis_winrm_username" +_WINRM_PASSWORD_VAR = "__borealis_winrm_password" +_WINRM_TRANSPORT_VAR = "__borealis_winrm_transport" + +""" +Job Scheduler module for Borealis + +Responsibilities: +- Maintain an internal clock/loop that evaluates scheduled jobs +- When a job's scheduled time arrives, mark it Running, then immediately Success (placeholder) +- Track simple run history in a dedicated table +- Expose job CRUD endpoints and computed scheduling metadata + +This module registers its Flask routes against the provided app and uses +socketio.start_background_task to run the scheduler loop cooperatively +with eventlet. +""" + + +def _now_ts() -> int: + return int(time.time()) + + +def _env_string(value: Any) -> str: + if isinstance(value, bool): + return "True" if value else "False" + if value is None: + return "" + return str(value) + + +def _decode_base64_text(value: Any) -> Optional[str]: + if not isinstance(value, str): + return None + stripped = value.strip() + if not stripped: + return "" + try: + cleaned = re.sub(r"\s+", "", stripped) + except Exception: + cleaned = stripped + try: + decoded = base64.b64decode(cleaned, validate=True) + except Exception: + return None + try: + return decoded.decode("utf-8") + except Exception: + return decoded.decode("utf-8", errors="replace") + + +def _inject_winrm_credential( + base_values: Optional[Dict[str, Any]], + credential: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + values: Dict[str, Any] = dict(base_values or {}) + if not credential: + return values + + username = str(credential.get("username") or "") + password = str(credential.get("password") or "") + metadata = credential.get("metadata") if isinstance(credential.get("metadata"), dict) else {} + transport = metadata.get("winrm_transport") if isinstance(metadata, dict) else None + transport_str = str(transport or "ntlm").strip().lower() or "ntlm" + + values[_WINRM_USERNAME_VAR] = username + values[_WINRM_PASSWORD_VAR] = password + values[_WINRM_TRANSPORT_VAR] = transport_str + return values + + +def _decode_script_content(value: Any, encoding_hint: str = "") -> str: + encoding = (encoding_hint or "").strip().lower() + if isinstance(value, str): + if encoding in ("base64", "b64", "base-64"): + decoded = _decode_base64_text(value) + if decoded is not None: + return decoded.replace("\r\n", "\n") + decoded = _decode_base64_text(value) + if decoded is not None: + return decoded.replace("\r\n", "\n") + return value.replace("\r\n", "\n") + return "" + + +def _encode_script_content(script_text: Any) -> str: + if not isinstance(script_text, str): + if script_text is None: + script_text = "" + else: + script_text = str(script_text) + normalized = script_text.replace("\r\n", "\n") + if not normalized: + return "" + encoded = base64.b64encode(normalized.encode("utf-8")) + return encoded.decode("ascii") + + +def _canonical_env_key(name: Any) -> str: + try: + return re.sub(r"[^A-Za-z0-9_]", "_", str(name or "").strip()).upper() + except Exception: + return "" + + +def _expand_env_aliases(env_map: Dict[str, str], variables: List[Dict[str, Any]]) -> Dict[str, str]: + expanded: Dict[str, str] = dict(env_map or {}) + if not isinstance(variables, list): + return expanded + for var in variables: + if not isinstance(var, dict): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + canonical = _canonical_env_key(name) + if not canonical or canonical not in expanded: + continue + value = expanded[canonical] + alias = re.sub(r"[^A-Za-z0-9_]", "_", name) + if alias and alias not in expanded: + expanded[alias] = value + if alias != name and re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name) and name not in expanded: + expanded[name] = value + return expanded + + +def _powershell_literal(value: Any, var_type: str) -> str: + typ = str(var_type or "string").lower() + if typ == "boolean": + if isinstance(value, bool): + truthy = value + elif value is None: + truthy = False + elif isinstance(value, (int, float)): + truthy = value != 0 + else: + s = str(value).strip().lower() + if s in {"true", "1", "yes", "y", "on"}: + truthy = True + elif s in {"false", "0", "no", "n", "off", ""}: + truthy = False + else: + truthy = bool(s) + return "$true" if truthy else "$false" + if typ == "number": + if value is None or value == "": + return "0" + return str(value) + s = "" if value is None else str(value) + return "'" + s.replace("'", "''") + "'" + + +def _extract_variable_default(var: Dict[str, Any]) -> Any: + for key in ("value", "default", "defaultValue", "default_value"): + if key in var: + val = var.get(key) + return "" if val is None else val + return "" + + +def _prepare_variable_context(doc_variables: List[Dict[str, Any]], overrides: Dict[str, Any]): + env_map: Dict[str, str] = {} + variables: List[Dict[str, Any]] = [] + literal_lookup: Dict[str, str] = {} + doc_names: Dict[str, bool] = {} + + overrides = overrides or {} + + if not isinstance(doc_variables, list): + doc_variables = [] + + for var in doc_variables: + if not isinstance(var, dict): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + doc_names[name] = True + canonical = _canonical_env_key(name) + var_type = str(var.get("type") or "string").lower() + default_val = _extract_variable_default(var) + final_val = overrides[name] if name in overrides else default_val + if canonical: + env_map[canonical] = _env_string(final_val) + literal_lookup[canonical] = _powershell_literal(final_val, var_type) + if name in overrides: + new_var = dict(var) + new_var["value"] = overrides[name] + variables.append(new_var) + else: + variables.append(var) + + for name, val in overrides.items(): + if name in doc_names: + continue + canonical = _canonical_env_key(name) + if canonical: + env_map[canonical] = _env_string(val) + literal_lookup[canonical] = _powershell_literal(val, "string") + variables.append({"name": name, "value": val, "type": "string"}) + + env_map = _expand_env_aliases(env_map, variables) + return env_map, variables, literal_lookup + + +_ENV_VAR_PATTERN = re.compile(r"(?i)\$env:(\{)?([A-Za-z0-9_\-]+)(?(1)\})") + + +def _rewrite_powershell_script(content: str, literal_lookup: Dict[str, str]) -> str: + if not content or not literal_lookup: + return content + + def _replace(match: Any) -> str: + name = match.group(2) + canonical = _canonical_env_key(name) + if not canonical: + return match.group(0) + literal = literal_lookup.get(canonical) + if literal is None: + return match.group(0) + return literal + + return _ENV_VAR_PATTERN.sub(_replace, content) + + +def _parse_ts(val: Any) -> Optional[int]: + """Best effort to parse ISO-ish datetime string or numeric seconds to epoch seconds.""" + if val is None: + return None + if isinstance(val, (int, float)): + return int(val) + try: + from datetime import datetime + s = str(val).strip().replace("Z", "+00:00") + dt = datetime.fromisoformat(s) + return int(dt.timestamp()) + except Exception: + return None + + +def _parse_expiration(s: Optional[str]) -> Optional[int]: + """Parse expiration shorthand to seconds. + Examples: '30m' -> 1800, '1h' -> 3600, '2d' -> 172800. + Returns None for 'no_expire' or invalid input. + """ + if not s or s == "no_expire": + return None + try: + s = s.strip().lower() + unit = s[-1] + value = int(s[:-1]) + if unit == 'm': + return value * 60 + if unit == 'h': + return value * 3600 + if unit == 'd': + return value * 86400 + # Fallback: treat as minutes if only a number + return int(s) * 60 + except Exception: + return None + + +def _floor_minute(ts: int) -> int: + ts = int(ts or 0) + return ts - (ts % 60) + + +def _now_minute() -> int: + return _floor_minute(_now_ts()) + + +def _add_months(dt_tuple: Tuple[int, int, int, int, int, int], months: int = 1) -> int: + """Advance a date by N months and return epoch seconds. + dt_tuple = (year, month, day, hour, minute, second) + Handles month-end clamping. + """ + from calendar import monthrange + from datetime import datetime + + y, m, d, hh, mm, ss = dt_tuple + m2 = m + months + y += (m2 - 1) // 12 + m2 = ((m2 - 1) % 12) + 1 + # Clamp day to last day of new month + last_day = monthrange(y, m2)[1] + d = min(d, last_day) + try: + return int(datetime(y, m2, d, hh, mm, ss).timestamp()) + except Exception: + # Fallback to first of month if something odd + return int(datetime(y, m2, 1, hh, mm, ss).timestamp()) + + +def _add_years(dt_tuple: Tuple[int, int, int, int, int, int], years: int = 1) -> int: + from datetime import datetime + y, m, d, hh, mm, ss = dt_tuple + y += years + # Handle Feb 29 -> Feb 28 if needed + try: + return int(datetime(y, m, d, hh, mm, ss).timestamp()) + except Exception: + # clamp day to 28 + d2 = 28 if (m == 2 and d > 28) else 1 + return int(datetime(y, m, d2, hh, mm, ss).timestamp()) + + +def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]: + from datetime import datetime + dt = datetime.utcfromtimestamp(int(ts)) + return (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) + + +class JobScheduler: + def __init__(self, app, socketio, db_path: str, script_signer=None): + self.app = app + self.socketio = socketio + self.db_path = db_path + self._script_signer = script_signer + self._running = False + # Simulated run duration to hold jobs in "Running" before Success + self.SIMULATED_RUN_SECONDS = int(os.environ.get("BOREALIS_SIM_RUN_SECONDS", "30")) + # Retention for run history (days) + self.RETENTION_DAYS = int(os.environ.get("BOREALIS_JOB_HISTORY_DAYS", "30")) + # Callback to retrieve current set of online hostnames + self._online_lookup: Optional[Callable[[], List[str]]] = None + # Optional callback to execute Ansible directly from the server + self._server_ansible_runner: Optional[Callable[..., str]] = None + # Optional callback to fetch stored credentials (with decrypted secrets) + self._credential_fetcher: Optional[Callable[[int], Optional[Dict[str, Any]]]] = None + + # Ensure run-history table exists + self._init_tables() + + # Bind routes + self._register_routes() + + # ---------- Helpers for dispatching scripts ---------- + def _scripts_root(self) -> str: + import os + # Unified Assemblies root; script paths should include top-level + # folder such as "Scripts" or "Ansible Playbooks". + return os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies") + ) + + def _is_valid_scripts_relpath(self, rel_path: str) -> bool: + try: + p = (rel_path or "").replace("\\", "/").lstrip("/") + if not p: + return False + top = p.split("/", 1)[0] + return top in ("Scripts",) + except Exception: + return False + + def _detect_script_type(self, filename: str) -> str: + fn_lower = (filename or "").lower() + if fn_lower.endswith(".json") and os.path.isfile(filename): + try: + with open(filename, "r", encoding="utf-8") as fh: + data = json.load(fh) + if isinstance(data, dict): + typ = str(data.get("type") or data.get("script_type") or "").strip().lower() + if typ in ("powershell", "batch", "bash", "ansible"): + return typ + except Exception: + pass + return "powershell" + if fn_lower.endswith(".yml"): + return "ansible" + if fn_lower.endswith(".ps1"): + return "powershell" + if fn_lower.endswith(".bat"): + return "batch" + if fn_lower.endswith(".sh"): + return "bash" + return "unknown" + + def _load_assembly_document(self, abs_path: str, default_type: str) -> Dict[str, Any]: + base_name = os.path.splitext(os.path.basename(abs_path))[0] + doc: Dict[str, Any] = { + "name": base_name, + "description": "", + "category": "application" if default_type == "ansible" else "script", + "type": default_type, + "script": "", + "variables": [], + "files": [], + "timeout_seconds": 3600, + } + if abs_path.lower().endswith(".json") and os.path.isfile(abs_path): + try: + with open(abs_path, "r", encoding="utf-8") as fh: + data = json.load(fh) + except Exception: + data = {} + if isinstance(data, dict): + doc["name"] = str(data.get("name") or doc["name"]) + doc["description"] = str(data.get("description") or "") + cat = str(data.get("category") or doc["category"]).strip().lower() + if cat in ("application", "script"): + doc["category"] = cat + typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() + if typ in ("powershell", "batch", "bash", "ansible"): + doc["type"] = typ + script_val = data.get("script") + content_val = data.get("content") + script_lines = data.get("script_lines") + if isinstance(script_lines, list): + try: + doc["script"] = "\n".join(str(line) for line in script_lines) + except Exception: + doc["script"] = "" + elif isinstance(script_val, str): + doc["script"] = script_val + else: + if isinstance(content_val, str): + doc["script"] = content_val + encoding_hint = str(data.get("script_encoding") or data.get("scriptEncoding") or "").strip().lower() + doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) + if encoding_hint in ("base64", "b64", "base-64"): + doc["script_encoding"] = "base64" + else: + probe_source = "" + if isinstance(script_val, str) and script_val: + probe_source = script_val + elif isinstance(content_val, str) and content_val: + probe_source = content_val + decoded_probe = _decode_base64_text(probe_source) if probe_source else None + if decoded_probe is not None: + doc["script_encoding"] = "base64" + doc["script"] = decoded_probe.replace("\r\n", "\n") + else: + doc["script_encoding"] = "plain" + try: + timeout_raw = data.get("timeout_seconds", data.get("timeout")) + if timeout_raw is None: + doc["timeout_seconds"] = 3600 + else: + doc["timeout_seconds"] = max(0, int(timeout_raw)) + except Exception: + doc["timeout_seconds"] = 3600 + vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] + doc["variables"] = [] + for v in vars_in: + if not isinstance(v, dict): + continue + name = str(v.get("name") or v.get("key") or "").strip() + if not name: + continue + vtype = str(v.get("type") or "string").strip().lower() + if vtype not in ("string", "number", "boolean", "credential"): + vtype = "string" + doc["variables"].append({ + "name": name, + "label": str(v.get("label") or ""), + "type": vtype, + "default": v.get("default", v.get("default_value")), + "required": bool(v.get("required")), + "description": str(v.get("description") or ""), + }) + files_in = data.get("files") if isinstance(data.get("files"), list) else [] + doc["files"] = [] + for f in files_in: + if not isinstance(f, dict): + continue + fname = f.get("file_name") or f.get("name") + if not fname or not isinstance(f.get("data"), str): + continue + try: + size_val = int(f.get("size") or 0) + except Exception: + size_val = 0 + doc["files"].append({ + "file_name": str(fname), + "size": size_val, + "mime_type": str(f.get("mime_type") or f.get("mimeType") or ""), + "data": f.get("data"), + }) + return doc + try: + with open(abs_path, "r", encoding="utf-8", errors="replace") as fh: + content = fh.read() + except Exception: + content = "" + normalized_script = (content or "").replace("\r\n", "\n") + doc["script"] = normalized_script + return doc + + def _ansible_root(self) -> str: + import os + return os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Ansible_Playbooks") + ) + + def _dispatch_ansible( + self, + hostname: str, + component: Dict[str, Any], + scheduled_job_id: int, + scheduled_run_row_id: int, + run_mode: str, + credential_id: Optional[int] = None, + use_service_account: bool = False, + ) -> Optional[Dict[str, Any]]: + try: + import os, uuid + ans_root = self._ansible_root() + rel_path = "" + overrides_map: Dict[str, Any] = {} + if isinstance(component, dict): + rel_path = component.get("path") or component.get("playbook_path") or component.get("script_path") or "" + raw_overrides = component.get("variable_values") + if isinstance(raw_overrides, dict): + for key, val in raw_overrides.items(): + name = str(key or "").strip() + if not name: + continue + overrides_map[name] = val + comp_vars = component.get("variables") + if isinstance(comp_vars, list): + for var in comp_vars: + if not isinstance(var, dict): + continue + name = str(var.get("name") or "").strip() + if not name or name in overrides_map: + continue + if "value" in var: + overrides_map[name] = var.get("value") + else: + rel_path = str(component or "") + rel_norm = (rel_path or "").replace("\\", "/").lstrip("/") + abs_path = os.path.abspath(os.path.join(ans_root, rel_norm)) + if (not abs_path.startswith(ans_root)) or (not os.path.isfile(abs_path)): + return None + doc = self._load_assembly_document(abs_path, "ansible") + content = doc.get("script") or "" + normalized_script = (content or "").replace("\r\n", "\n") + script_bytes = normalized_script.encode("utf-8") + encoded_content = base64.b64encode(script_bytes).decode("ascii") if script_bytes or normalized_script == "" else "" + signature_b64: Optional[str] = None + sig_alg: Optional[str] = None + signing_key_b64: Optional[str] = None + if self._script_signer is not None: + try: + signature = self._script_signer.sign(script_bytes) + signature_b64 = base64.b64encode(signature).decode("ascii") + sig_alg = "ed25519" + signing_key_b64 = self._script_signer.public_base64_spki() + except Exception: + signature_b64 = None + sig_alg = None + signing_key_b64 = None + variables = doc.get("variables") or [] + files = doc.get("files") or [] + run_mode_norm = (run_mode or "system").strip().lower() + server_run = run_mode_norm == "ssh" + agent_winrm = run_mode_norm == "winrm" + + if agent_winrm and not use_service_account: + if not credential_id: + raise RuntimeError("WinRM execution requires a credential_id") + if not callable(self._credential_fetcher): + raise RuntimeError("Credential fetcher is not configured") + cred_detail = self._credential_fetcher(int(credential_id)) + if not cred_detail: + raise RuntimeError("Credential not found") + try: + overrides_map = _inject_winrm_credential(overrides_map, cred_detail) + finally: + try: + cred_detail.clear() # type: ignore[attr-defined] + except Exception: + pass + + # Record in activity_history for UI parity + now = _now_ts() + act_id = None + conn = self._conn() + cur = conn.cursor() + try: + cur.execute( + """ + INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) + VALUES(?,?,?,?,?,?,?,?) + """, + ( + str(hostname), + rel_norm, + doc.get("name") or os.path.basename(abs_path), + "ansible", + now, + "Running", + "", + "", + ), + ) + act_id = cur.lastrowid + conn.commit() + finally: + conn.close() + + if server_run: + if not credential_id: + raise RuntimeError("Remote execution requires a credential_id") + if not callable(self._server_ansible_runner): + raise RuntimeError("Server-side Ansible runner is not configured") + try: + self._server_ansible_runner( + hostname=str(hostname), + playbook_abs_path=abs_path, + playbook_rel_path=rel_norm, + playbook_name=doc.get("name") or os.path.basename(abs_path), + credential_id=int(credential_id), + variable_values=overrides_map, + source="scheduled_job", + activity_id=act_id, + scheduled_job_id=scheduled_job_id, + scheduled_run_id=scheduled_run_row_id, + scheduled_job_run_row_id=scheduled_run_row_id, + ) + except Exception as exc: + try: + self.app.logger.warning( + "[Scheduler] Server-side Ansible queue failed job=%s run=%s host=%s err=%s", + scheduled_job_id, + scheduled_run_row_id, + hostname, + exc, + ) + except Exception: + print(f"[Scheduler] Server-side Ansible queue failed job={scheduled_job_id} host={hostname} err={exc}") + if act_id: + try: + conn_fail = self._conn() + cur_fail = conn_fail.cursor() + cur_fail.execute( + "UPDATE activity_history SET status='Failed', stderr=?, ran_at=? WHERE id=?", + (str(exc), _now_ts(), act_id), + ) + conn_fail.commit() + conn_fail.close() + except Exception: + pass + raise + else: + payload = { + "run_id": uuid.uuid4().hex, + "target_hostname": str(hostname), + "playbook_name": doc.get("name") or os.path.basename(abs_path), + "playbook_content": encoded_content, + "playbook_encoding": "base64", + "activity_job_id": act_id, + "scheduled_job_id": int(scheduled_job_id), + "scheduled_run_id": int(scheduled_run_row_id), + "connection": "winrm", + "variables": variables, + "files": files, + "variable_values": overrides_map, + } + try: + self.socketio.emit("ansible_playbook_run", payload) + except Exception: + pass + if act_id: + return { + "activity_id": int(act_id), + "component_name": doc.get("name") or os.path.basename(abs_path), + "component_path": rel_norm, + "script_type": "ansible", + "component_kind": "ansible", + } + return None + except Exception: + pass + + def _dispatch_script(self, hostname: str, component: Dict[str, Any], run_mode: str) -> Optional[Dict[str, Any]]: + """Emit a quick_job_run event to agents for the given script/host. + Mirrors /api/scripts/quick_run behavior for scheduled jobs. + """ + try: + scripts_root = self._scripts_root() + import os + rel_path_raw = "" + if isinstance(component, dict): + rel_path_raw = str(component.get("path") or component.get("script_path") or "") + else: + rel_path_raw = str(component or "") + path_norm = (rel_path_raw or "").replace("\\", "/").strip() + if path_norm and not path_norm.startswith("Scripts/"): + path_norm = f"Scripts/{path_norm}" + abs_path = os.path.abspath(os.path.join(scripts_root, path_norm)) + if (not abs_path.startswith(scripts_root)) or (not self._is_valid_scripts_relpath(path_norm)) or (not os.path.isfile(abs_path)): + return None + doc = self._load_assembly_document(abs_path, "powershell") + stype = (doc.get("type") or "powershell").lower() + # For now, only PowerShell is supported by agents for scheduled jobs + if stype != "powershell": + return None + content = doc.get("script") or "" + doc_variables = doc.get("variables") if isinstance(doc.get("variables"), list) else [] + + overrides: Dict[str, Any] = {} + if isinstance(component, dict): + if isinstance(component.get("variable_values"), dict): + for key, val in component.get("variable_values").items(): + name = str(key or "").strip() + if name: + overrides[name] = val + if isinstance(component.get("variables"), list): + for var in component.get("variables"): + if not isinstance(var, dict): + continue + name = str(var.get("name") or "").strip() + if not name: + continue + if "value" in var: + overrides[name] = var.get("value") + + env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides) + content = _rewrite_powershell_script(content, literal_lookup) + encoded_content = _encode_script_content(content) + timeout_seconds = 0 + try: + timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0)) + except Exception: + timeout_seconds = 0 + + # Insert into activity_history for device for parity with Quick Job + import sqlite3 + now = _now_ts() + act_id = None + conn = sqlite3.connect(self.db_path) + cur = conn.cursor() + try: + cur.execute( + """ + INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) + VALUES(?,?,?,?,?,?,?,?) + """, + ( + str(hostname), + path_norm, + doc.get("name") or os.path.basename(abs_path), + stype, + now, + "Running", + "", + "", + ), + ) + act_id = cur.lastrowid + conn.commit() + finally: + conn.close() + + payload = { + "job_id": act_id, + "target_hostname": str(hostname), + "script_type": stype, + "script_name": doc.get("name") or os.path.basename(abs_path), + "script_path": path_norm, + "script_content": encoded_content, + "script_encoding": "base64", + "environment": env_map, + "variables": variables, + "timeout_seconds": timeout_seconds, + "files": doc.get("files") or [], + "run_mode": (run_mode or "system").strip().lower(), + "admin_user": "", + "admin_pass": "", + } + if signature_b64: + payload["signature"] = signature_b64 + if sig_alg: + payload["sig_alg"] = sig_alg + if signing_key_b64: + payload["signing_key"] = signing_key_b64 + try: + self.socketio.emit("quick_job_run", payload) + except Exception: + pass + if act_id: + return { + "activity_id": int(act_id), + "component_name": doc.get("name") or os.path.basename(abs_path), + "component_path": path_norm, + "script_type": stype, + "component_kind": "script", + } + return None + except Exception: + # Keep scheduler resilient + pass + return None + + # ---------- DB helpers ---------- + def _conn(self): + return sqlite3.connect(self.db_path) + + def set_credential_fetcher(self, fn: Optional[Callable[[int], Optional[Dict[str, Any]]]]): + self._credential_fetcher = fn + + def _init_tables(self): + conn = self._conn() + cur = conn.cursor() + # Runs table captures each firing + cur.execute( + """ + CREATE TABLE IF NOT EXISTS scheduled_job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL, + scheduled_ts INTEGER, + started_ts INTEGER, + finished_ts INTEGER, + status TEXT, + error TEXT, + created_at INTEGER, + updated_at INTEGER, + FOREIGN KEY(job_id) REFERENCES scheduled_jobs(id) ON DELETE CASCADE + ) + """ + ) + # Add per-target column if missing + try: + cur.execute("PRAGMA table_info(scheduled_job_runs)") + cols = {row[1] for row in cur.fetchall()} + if 'target_hostname' not in cols: + cur.execute("ALTER TABLE scheduled_job_runs ADD COLUMN target_hostname TEXT") + except Exception: + pass + # Helpful index for lookups + try: + cur.execute("CREATE INDEX IF NOT EXISTS idx_runs_job_sched_target ON scheduled_job_runs(job_id, scheduled_ts, target_hostname)") + except Exception: + pass + try: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS scheduled_job_run_activity ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL, + activity_id INTEGER NOT NULL, + component_kind TEXT, + script_type TEXT, + component_path TEXT, + component_name TEXT, + created_at INTEGER, + FOREIGN KEY(run_id) REFERENCES scheduled_job_runs(id) ON DELETE CASCADE, + FOREIGN KEY(activity_id) REFERENCES activity_history(id) ON DELETE CASCADE + ) + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_run_activity_run ON scheduled_job_run_activity(run_id)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_run_activity_activity ON scheduled_job_run_activity(activity_id)") + except Exception: + pass + conn.commit() + conn.close() + + # ---------- Scheduling core ---------- + def _get_last_run(self, job_id: int) -> Optional[Dict[str, Any]]: + conn = self._conn() + cur = conn.cursor() + cur.execute( + "SELECT id, scheduled_ts, started_ts, finished_ts, status FROM scheduled_job_runs WHERE job_id=? ORDER BY COALESCE(started_ts, scheduled_ts, 0) DESC, id DESC LIMIT 1", + (job_id,) + ) + row = cur.fetchone() + conn.close() + if not row: + return None + return { + "id": row[0], + "scheduled_ts": row[1], + "started_ts": row[2], + "finished_ts": row[3], + "status": row[4] or "", + } + + def _compute_next_run(self, schedule_type: str, start_ts: Optional[int], last_run_ts: Optional[int], now_ts: int) -> Optional[int]: + st = (schedule_type or "immediately").strip().lower() + start_ts = _floor_minute(start_ts) if start_ts else None + last_run_ts = _floor_minute(last_run_ts) if last_run_ts else None + now_ts = _floor_minute(now_ts) + if st == "immediately": + # Run once asap if never ran + return None if last_run_ts else now_ts + if st == "once": + if not start_ts: + return None + # If never ran and time in future/now + if not last_run_ts: + return start_ts + return None + if not start_ts: + return None + + # For recurring types, base off start_ts and last_run_ts + last = last_run_ts if last_run_ts else None + # Minute/Hour intervals + if st in ("every_5_minutes", "every_10_minutes", "every_15_minutes", "every_30_minutes", "every_hour"): + period_map = { + "every_5_minutes": 5 * 60, + "every_10_minutes": 10 * 60, + "every_15_minutes": 15 * 60, + "every_30_minutes": 30 * 60, + "every_hour": 60 * 60, + } + period = period_map.get(st) + candidate = (last + period) if last else start_ts + while candidate is not None and candidate <= now_ts - 1: + candidate += period + return candidate + if st == "daily": + period = 86400 + candidate = last + period if last else start_ts + while candidate is not None and candidate <= now_ts - 1: + candidate += period + return candidate + if st == "weekly": + period = 7 * 86400 + candidate = last + period if last else start_ts + while candidate is not None and candidate <= now_ts - 1: + candidate += period + return candidate + if st == "monthly": + base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) + candidate = _add_months(base, 1 if last else 0) + while candidate <= now_ts - 1: + base = _to_dt_tuple(candidate) + candidate = _add_months(base, 1) + return candidate + if st == "yearly": + base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) + candidate = _add_years(base, 1 if last else 0) + while candidate <= now_ts - 1: + base = _to_dt_tuple(candidate) + candidate = _add_years(base, 1) + return candidate + return None + + def _should_expire(self, started_ts: Optional[int], expiration: Optional[str], now_ts: int) -> bool: + if not started_ts: + return False + seconds = _parse_expiration(expiration) + if not seconds: + return False + return (started_ts + seconds) <= now_ts + + def _tick_once(self): + """Evaluate all enabled scheduled jobs and trigger those due. + Placeholder execution: mark Running then Success immediately. + """ + now = _now_ts() + conn = self._conn() + cur = conn.cursor() + # First, mark any stale Running runs that exceeded job expiration as Timed Out + try: + cur.execute( + """ + SELECT r.id, r.started_ts, j.expiration + FROM scheduled_job_runs r + JOIN scheduled_jobs j ON j.id = r.job_id + WHERE r.status = 'Running' + """ + ) + rows = cur.fetchall() + for rid, started_ts, expiration in rows: + if self._should_expire(started_ts, expiration, now): + try: + c2 = conn.cursor() + c2.execute( + "UPDATE scheduled_job_runs SET status='Timed Out', updated_at=? WHERE id=?", + (now, rid), + ) + conn.commit() + except Exception: + pass + except Exception: + pass + + # Next, finalize any Running runs that have passed the simulated duration + try: + cur.execute( + "SELECT id, started_ts FROM scheduled_job_runs WHERE status='Running'" + ) + rows = cur.fetchall() + for rid, started_ts in rows: + if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now: + try: + c2 = conn.cursor() + c2.execute( + "UPDATE scheduled_job_runs SET finished_ts=?, status='Success', updated_at=? WHERE id=?", + (now, now, rid), + ) + conn.commit() + except Exception: + pass + except Exception: + pass + + # Finally, rotate history older than the retention window + try: + cutoff = now - (self.RETENTION_DAYS * 86400) + cur.execute( + "DELETE FROM scheduled_job_runs WHERE COALESCE(finished_ts, started_ts, scheduled_ts, 0) < ?", + (cutoff,) + ) + conn.commit() + except Exception: + pass + try: + cur.execute( + "SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, use_service_account, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC" + ) + jobs = cur.fetchall() + except Exception: + jobs = [] + conn.close() + + # Online hostnames snapshot for this tick + online = set() + try: + if callable(self._online_lookup): + online = set(self._online_lookup() or []) + except Exception: + online = set() + + five_min = 300 + now_min = _now_minute() + + for ( + job_id, + components_json, + targets_json, + schedule_type, + start_ts, + expiration, + execution_context, + credential_id, + use_service_account_flag, + created_at, + ) in jobs: + try: + # Targets list for this job + try: + targets = json.loads(targets_json or "[]") + except Exception: + targets = [] + targets = [str(t) for t in targets if isinstance(t, (str, int))] + total_targets = len(targets) + + # Determine scripts to run for this job (first-pass: all 'script' components) + try: + comps = json.loads(components_json or "[]") + except Exception: + comps = [] + script_components = [] + ansible_components = [] + for c in comps: + try: + ctype = (c or {}).get("type") + if ctype == "script": + p = (c.get("path") or c.get("script_path") or "").strip() + if p: + comp_copy = dict(c) + comp_copy["path"] = p + script_components.append(comp_copy) + elif ctype == "ansible": + p = (c.get("path") or "").strip() + if p: + comp_copy = dict(c) + comp_copy["path"] = p + ansible_components.append(comp_copy) + except Exception: + continue + run_mode = (execution_context or "system").strip().lower() + job_credential_id = None + job_use_service_account = bool(use_service_account_flag) + if run_mode != "winrm": + job_use_service_account = False + try: + job_credential_id = int(credential_id) if credential_id is not None else None + except Exception: + job_credential_id = None + + exp_seconds = _parse_expiration(expiration) + + # Determine current occurrence to work on + occ = None + try: + conn2 = self._conn() + c2 = conn2.cursor() + c2.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,)) + occ_from_runs = c2.fetchone()[0] + conn2.close() + except Exception: + occ_from_runs = None + + if occ_from_runs: + occ = int(occ_from_runs) + # Check if occurrence is complete (terminal status per target) + try: + conn2 = self._conn() + c2 = conn2.cursor() + c2.execute( + "SELECT COUNT(DISTINCT target_hostname) FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? AND status IN ('Success','Expired','Timed Out')", + (job_id, occ) + ) + done_count = int(c2.fetchone()[0] or 0) + conn2.close() + except Exception: + done_count = 0 + + if total_targets > 0 and done_count >= total_targets: + # Move to next occurrence if due in window + nxt = self._compute_next_run(schedule_type, start_ts, occ, now_min) + if nxt is not None and now_min >= nxt and (now_min - nxt) <= five_min: + occ = nxt + else: + # Nothing to do this tick for this job + continue + else: + # Continue working on this occurrence regardless of the 5-minute window until expiration + pass + else: + # No occurrence yet; derive initial occurrence + if (schedule_type or '').lower() == 'immediately': + occ = _floor_minute(created_at or now_min) + else: + st_min = _floor_minute(start_ts) if start_ts else None + if st_min is None: + # Try compute_next_run to derive first planned occurrence + occ = self._compute_next_run(schedule_type, start_ts, None, now_min) + else: + occ = st_min + if occ is None: + continue + # For first occurrence, require it be within the 5-minute window to trigger + if not (now_min >= occ and (now_min - occ) <= five_min): + continue + + # For each target, if no run exists for this occurrence, either start it (if online) or keep waiting/expire + for host in targets: + # Skip if a run already exists for this job/host/occurrence + try: + conn2 = self._conn() + c2 = conn2.cursor() + c2.execute( + "SELECT id, status, started_ts FROM scheduled_job_runs WHERE job_id=? AND target_hostname=? AND scheduled_ts=? ORDER BY id DESC LIMIT 1", + (job_id, host, occ) + ) + row = c2.fetchone() + except Exception: + row = None + if row: + # Existing record - if Running, timeout handled earlier; skip + conn2.close() + continue + + # Start if online; otherwise, wait until expiration + if host in online: + try: + ts_now = _now_ts() + c2.execute( + "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, started_ts, status, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", + (job_id, host, occ, ts_now, "Running", ts_now, ts_now), + ) + run_row_id = c2.lastrowid or 0 + conn2.commit() + activity_links: List[Dict[str, Any]] = [] + remote_requires_cred = (run_mode == "ssh") or (run_mode == "winrm" and not job_use_service_account) + if remote_requires_cred and not job_credential_id: + err_msg = "Credential required for remote execution" + c2.execute( + "UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?", + (ts_now, ts_now, err_msg, run_row_id), + ) + conn2.commit() + else: + # Dispatch all script components for this job to the target host + for comp in script_components: + try: + link = self._dispatch_script(host, comp, run_mode) + if link and link.get("activity_id"): + activity_links.append({ + "run_id": run_row_id, + "activity_id": int(link["activity_id"]), + "component_kind": link.get("component_kind") or "script", + "script_type": link.get("script_type") or "powershell", + "component_path": link.get("component_path") or "", + "component_name": link.get("component_name") or "", + }) + except Exception: + continue + # Dispatch ansible playbooks for this job to the target host + for comp in ansible_components: + try: + link = self._dispatch_ansible( + host, + comp, + job_id, + run_row_id, + run_mode, + job_credential_id, + job_use_service_account, + ) + if link and link.get("activity_id"): + activity_links.append({ + "run_id": run_row_id, + "activity_id": int(link["activity_id"]), + "component_kind": link.get("component_kind") or "ansible", + "script_type": link.get("script_type") or "ansible", + "component_path": link.get("component_path") or "", + "component_name": link.get("component_name") or "", + }) + except Exception as exc: + try: + c2.execute( + "UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?", + (ts_now, ts_now, str(exc)[:512], run_row_id), + ) + conn2.commit() + except Exception: + pass + continue + if activity_links: + try: + for link in activity_links: + c2.execute( + "INSERT OR IGNORE INTO scheduled_job_run_activity(run_id, activity_id, component_kind, script_type, component_path, component_name, created_at) VALUES (?,?,?,?,?,?,?)", + ( + int(link["run_id"]), + int(link["activity_id"]), + link.get("component_kind") or "", + link.get("script_type") or "", + link.get("component_path") or "", + link.get("component_name") or "", + ts_now, + ), + ) + conn2.commit() + except Exception: + pass + except Exception: + pass + finally: + conn2.close() + else: + # If expired window is configured and has passed, mark this host as Expired + if exp_seconds is not None and (occ + exp_seconds) <= now: + try: + ts_now = _now_ts() + c2.execute( + "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)", + (job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now), + ) + conn2.commit() + except Exception: + pass + finally: + conn2.close() + else: + # keep waiting; no record created yet + try: + conn2.close() + except Exception: + pass + except Exception: + # Keep loop resilient + continue + + def start(self): + if self._running: + return + self._running = True + + def _loop(): + # cooperative loop aligned to minutes + while self._running: + try: + self._tick_once() + except Exception: + pass + # Sleep until next minute boundary + delay = 60 - (_now_ts() % 60) + try: + import eventlet + eventlet.sleep(delay) + except Exception: + time.sleep(delay) + + # Use SocketIO helper so it integrates with eventlet + try: + self.socketio.start_background_task(_loop) + except Exception: + # Fallback to thread + import threading + threading.Thread(target=_loop, daemon=True).start() + + # ---------- Route registration ---------- + def _register_routes(self): + app = self.app + + # Utility: job row + def _job_row_to_dict(r) -> Dict[str, Any]: + base = { + "id": r[0], + "name": r[1], + "components": json.loads(r[2] or "[]"), + "targets": json.loads(r[3] or "[]"), + "schedule_type": r[4] or "immediately", + "start_ts": r[5], + "duration_stop_enabled": bool(r[6] or 0), + "expiration": r[7] or "no_expire", + "execution_context": r[8] or "system", + "credential_id": r[9], + "use_service_account": bool(r[10] or 0), + "enabled": bool(r[11] or 0), + "created_at": r[12] or 0, + "updated_at": r[13] or 0, + } + # Attach computed status summary for latest occurrence + try: + conn = self._conn() + c = conn.cursor() + c.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (base["id"],)) + max_occ = c.fetchone()[0] + summary_status = None + last_run_ts = None + result_counts = { + "pending": 0, + "running": 0, + "success": 0, + "failed": 0, + "expired": 0, + "timed_out": 0, + "total_targets": len(base.get("targets") or []), + } + if max_occ: + # Summarize statuses for this occurrence + c.execute( + "SELECT status, COUNT(*) FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? GROUP BY status", + (base["id"], max_occ) + ) + counts = {row[0] or "": int(row[1] or 0) for row in c.fetchall()} + result_counts["running"] = counts.get("Running", 0) + result_counts["success"] = counts.get("Success", 0) + result_counts["failed"] = counts.get("Failed", 0) + result_counts["expired"] = counts.get("Expired", 0) + result_counts["timed_out"] = counts.get("Timed Out", 0) + computed = result_counts["running"] + result_counts["success"] + result_counts["failed"] + result_counts["expired"] + result_counts["timed_out"] + pend = (result_counts["total_targets"] or 0) - computed + result_counts["pending"] = pend if pend > 0 else 0 + # Priority: Running > Timed Out > Expired > Success + if counts.get("Running"): + summary_status = "Running" + elif counts.get("Timed Out"): + summary_status = "Timed Out" + elif counts.get("Expired"): + summary_status = "Expired" + elif counts.get("Success"): + summary_status = "Success" + last_run_ts = int(max_occ) + conn.close() + except Exception: + summary_status = None + last_run_ts = None + result_counts = { + "pending": len(base.get("targets") or []), + "running": 0, + "success": 0, + "failed": 0, + "expired": 0, + "timed_out": 0, + "total_targets": len(base.get("targets") or []), + } + base["last_run_ts"] = last_run_ts + base["last_status"] = summary_status or ("Scheduled" if base.get("start_ts") else "") + base["latest_occurrence"] = last_run_ts + base["result_counts"] = result_counts + try: + base["next_run_ts"] = self._compute_next_run( + base["schedule_type"], base.get("start_ts"), base.get("last_run_ts"), _now_ts() + ) + except Exception: + base["next_run_ts"] = None + return base + + @app.route("/api/scheduled_jobs", methods=["GET"]) + def api_scheduled_jobs_list(): + try: + conn = self._conn() + cur = conn.cursor() + cur.execute( + """ + SELECT id, name, components_json, targets_json, schedule_type, start_ts, + duration_stop_enabled, expiration, execution_context, credential_id, + use_service_account, enabled, created_at, updated_at + FROM scheduled_jobs + ORDER BY created_at DESC + """ + ) + rows = [ _job_row_to_dict(r) for r in cur.fetchall() ] + conn.close() + return json.dumps({"jobs": rows}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs", methods=["POST"]) + def api_scheduled_jobs_create(): + data = self._json_body() + name = (data.get("name") or "").strip() + components = data.get("components") or [] + targets = data.get("targets") or [] + schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower() + start = data.get("schedule", {}).get("start") or data.get("start") or None + start_ts = _parse_ts(start) if start else None + duration_stop_enabled = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled"))) + expiration = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire" + execution_context = (data.get("execution_context") or "system").strip().lower() + credential_id = data.get("credential_id") + try: + credential_id = int(credential_id) if credential_id is not None else None + except Exception: + credential_id = None + use_service_account_raw = data.get("use_service_account") + use_service_account = 1 if (execution_context == "winrm" and (use_service_account_raw is None or bool(use_service_account_raw))) else 0 + enabled = int(bool(data.get("enabled", True))) + if not name or not components or not targets: + return json.dumps({"error": "name, components, targets required"}), 400, {"Content-Type": "application/json"} + now = _now_ts() + try: + conn = self._conn() + cur = conn.cursor() + cur.execute( + """ + INSERT INTO scheduled_jobs + (name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) + """, + ( + name, + json.dumps(components), + json.dumps(targets), + schedule_type, + start_ts, + duration_stop_enabled, + expiration, + execution_context, + credential_id, + use_service_account, + enabled, + now, + now, + ), + ) + job_id = cur.lastrowid + conn.commit() + cur.execute( + """ + SELECT id, name, components_json, targets_json, schedule_type, start_ts, + duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at + FROM scheduled_jobs WHERE id=? + """, + (job_id,), + ) + row = cur.fetchone() + conn.close() + return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs/", methods=["GET"]) + def api_scheduled_jobs_get(job_id: int): + try: + conn = self._conn() + cur = conn.cursor() + cur.execute( + """ + SELECT id, name, components_json, targets_json, schedule_type, start_ts, + duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at + FROM scheduled_jobs WHERE id=? + """, + (job_id,), + ) + row = cur.fetchone() + conn.close() + if not row: + return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} + return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs/", methods=["PUT"]) + def api_scheduled_jobs_update(job_id: int): + data = self._json_body() + fields: Dict[str, Any] = {} + if "name" in data: + fields["name"] = (data.get("name") or "").strip() + if "components" in data: + fields["components_json"] = json.dumps(data.get("components") or []) + if "targets" in data: + fields["targets_json"] = json.dumps(data.get("targets") or []) + if "schedule" in data or "schedule_type" in data: + schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower() + fields["schedule_type"] = schedule_type + start = data.get("schedule", {}).get("start") or data.get("start") or None + fields["start_ts"] = _parse_ts(start) if start else None + if "duration" in data or "duration_stop_enabled" in data: + fields["duration_stop_enabled"] = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled"))) + if "expiration" in data or (data.get("duration") and "expiration" in data.get("duration")): + fields["expiration"] = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire" + if "execution_context" in data: + exec_ctx_val = (data.get("execution_context") or "system").strip().lower() + fields["execution_context"] = exec_ctx_val + if exec_ctx_val != "winrm": + fields["use_service_account"] = 0 + if "credential_id" in data: + cred_val = data.get("credential_id") + if cred_val in (None, "", "null"): + fields["credential_id"] = None + else: + try: + fields["credential_id"] = int(cred_val) + except Exception: + fields["credential_id"] = None + if "use_service_account" in data: + fields["use_service_account"] = 1 if bool(data.get("use_service_account")) else 0 + if "enabled" in data: + fields["enabled"] = int(bool(data.get("enabled"))) + if not fields: + return json.dumps({"error": "no fields to update"}), 400, {"Content-Type": "application/json"} + try: + conn = self._conn() + cur = conn.cursor() + sets = ", ".join([f"{k}=?" for k in fields.keys()]) + params = list(fields.values()) + [_now_ts(), job_id] + cur.execute(f"UPDATE scheduled_jobs SET {sets}, updated_at=? WHERE id=?", params) + if cur.rowcount == 0: + conn.close() + return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} + conn.commit() + cur.execute( + """ + SELECT id, name, components_json, targets_json, schedule_type, start_ts, + duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at + FROM scheduled_jobs WHERE id=? + """, + (job_id,), + ) + row = cur.fetchone() + conn.close() + return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs//toggle", methods=["POST"]) + def api_scheduled_jobs_toggle(job_id: int): + data = self._json_body() + enabled = int(bool(data.get("enabled", True))) + try: + conn = self._conn() + cur = conn.cursor() + cur.execute("UPDATE scheduled_jobs SET enabled=?, updated_at=? WHERE id=?", (enabled, _now_ts(), job_id)) + if cur.rowcount == 0: + conn.close() + return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} + conn.commit() + cur.execute( + "SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=?", + (job_id,), + ) + row = cur.fetchone() + conn.close() + return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs/", methods=["DELETE"]) + def api_scheduled_jobs_delete(job_id: int): + try: + conn = self._conn() + cur = conn.cursor() + cur.execute("DELETE FROM scheduled_jobs WHERE id=?", (job_id,)) + deleted = cur.rowcount + conn.commit() + conn.close() + if deleted == 0: + return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} + return json.dumps({"status": "ok"}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs//runs", methods=["GET"]) + def api_scheduled_job_runs(job_id: int): + try: + from flask import request + days = request.args.get('days', default=str(self.RETENTION_DAYS)) + try: + days = max(1, int(days)) + except Exception: + days = self.RETENTION_DAYS + cutoff = _now_ts() - (days * 86400) + + conn = self._conn() + cur = conn.cursor() + cur.execute( + """ + SELECT id, scheduled_ts, started_ts, finished_ts, status, error + FROM scheduled_job_runs + WHERE job_id=? AND COALESCE(finished_ts, started_ts, scheduled_ts, 0) >= ? + ORDER BY COALESCE(started_ts, scheduled_ts, 0) DESC, id DESC + LIMIT 500 + """, + (job_id, cutoff) + ) + rows = cur.fetchall() + conn.close() + runs = [ + { + "id": r[0], + "scheduled_ts": r[1], + "started_ts": r[2], + "finished_ts": r[3], + "status": r[4] or "", + "error": r[5] or "", + } + for r in rows + ] + return json.dumps({"runs": runs}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs//devices", methods=["GET"]) + def api_scheduled_job_devices(job_id: int): + """Return per-target status for the latest occurrence (or specified via ?occurrence=epoch).""" + try: + from flask import request + occurrence = request.args.get('occurrence') + occ = int(occurrence) if occurrence else None + + conn = self._conn() + cur = conn.cursor() + cur.execute( + "SELECT targets_json FROM scheduled_jobs WHERE id=?", + (job_id,) + ) + row = cur.fetchone() + if not row: + conn.close() + return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} + try: + targets = json.loads(row[0] or "[]") + except Exception: + targets = [] + targets = [str(t) for t in targets if isinstance(t, (str, int))] + + # Determine occurrence if not provided + if occ is None: + cur.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,)) + occ_row = cur.fetchone() + occ = int(occ_row[0]) if occ_row and occ_row[0] else None + + # Sites lookup + site_by_host: Dict[str, str] = {} + try: + cur.execute( + "SELECT device_hostname, sites.name FROM device_sites JOIN sites ON sites.id = device_sites.site_id" + ) + for h, n in cur.fetchall(): + site_by_host[str(h)] = n or "" + except Exception: + pass + + # Status per target for occurrence + run_by_host: Dict[str, Dict[str, Any]] = {} + run_ids: List[int] = [] + if occ is not None: + try: + cur.execute( + "SELECT id, target_hostname, status, started_ts, finished_ts FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? ORDER BY id DESC", + (job_id, occ) + ) + rows = cur.fetchall() + for rid, h, st, st_ts, fin_ts in rows: + h = str(h) + if h not in run_by_host: + run_by_host[h] = { + "status": st or "", + "started_ts": st_ts, + "finished_ts": fin_ts, + "run_id": int(rid), + } + run_ids.append(int(rid)) + except Exception: + pass + + activities_by_run: Dict[int, List[Dict[str, Any]]] = {} + if run_ids: + try: + placeholders = ",".join(["?"] * len(run_ids)) + cur.execute( + f""" + SELECT + s.run_id, + s.activity_id, + s.component_kind, + s.script_type, + s.component_path, + s.component_name, + COALESCE(LENGTH(h.stdout), 0), + COALESCE(LENGTH(h.stderr), 0) + FROM scheduled_job_run_activity s + LEFT JOIN activity_history h ON h.id = s.activity_id + WHERE s.run_id IN ({placeholders}) + """, + run_ids, + ) + for rid, act_id, kind, stype, path, name, so_len, se_len in cur.fetchall(): + rid = int(rid) + entry = { + "activity_id": int(act_id), + "component_kind": kind or "", + "script_type": stype or "", + "component_path": path or "", + "component_name": name or "", + "has_stdout": bool(so_len), + "has_stderr": bool(se_len), + } + activities_by_run.setdefault(rid, []).append(entry) + except Exception: + pass + + conn.close() + + # Online snapshot + online = set() + try: + if callable(self._online_lookup): + online = set(self._online_lookup() or []) + except Exception: + online = set() + + out = [] + for host in targets: + rec = run_by_host.get(str(host), {}) + job_status = rec.get("status") or "Pending" + ran_on = rec.get("started_ts") or rec.get("finished_ts") + activities = activities_by_run.get(rec.get("run_id", 0) or 0, []) + has_stdout = any(a.get("has_stdout") for a in activities) + has_stderr = any(a.get("has_stderr") for a in activities) + out.append({ + "hostname": str(host), + "online": str(host) in online, + "site": site_by_host.get(str(host), ""), + "ran_on": ran_on, + "job_status": job_status, + "has_stdout": has_stdout, + "has_stderr": has_stderr, + "activities": activities, + }) + + return json.dumps({"occurrence": occ, "devices": out}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + @app.route("/api/scheduled_jobs//runs", methods=["DELETE"]) + def api_scheduled_job_runs_clear(job_id: int): + """Clear all historical runs for a job except the most recent occurrence. + + We keep all rows that belong to the latest occurrence (by scheduled_ts) + and delete everything older. If there is no occurrence, no-op. + """ + try: + conn = self._conn() + cur = conn.cursor() + # Determine latest occurrence for this job + cur.execute( + "SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", + (job_id,) + ) + row = cur.fetchone() + latest = int(row[0]) if row and row[0] is not None else None + + if latest is None: + # Nothing to clear + conn.close() + return json.dumps({"status": "ok", "cleared": 0}), 200, {"Content-Type": "application/json"} + + # Delete all runs for older occurrences + cur.execute( + "DELETE FROM scheduled_job_runs WHERE job_id=? AND COALESCE(scheduled_ts, 0) < ?", + (job_id, latest), + ) + cleared = cur.rowcount or 0 + conn.commit() + conn.close() + return json.dumps({"status": "ok", "cleared": int(cleared), "kept_occurrence": latest}), 200, {"Content-Type": "application/json"} + except Exception as e: + return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} + + # ---------- Request helpers ---------- + def _json_body(self) -> Dict[str, Any]: + try: + from flask import request + return request.get_json(silent=True) or {} + except Exception: + return {} + + +def register(app, socketio, db_path: str, script_signer=None) -> JobScheduler: + """Factory to create and return a JobScheduler instance.""" + return JobScheduler(app, socketio, db_path, script_signer=script_signer) + + +def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]): + scheduler._online_lookup = fn + + +def set_server_ansible_runner(scheduler: JobScheduler, fn: Callable[..., str]): + scheduler._server_ansible_runner = fn + + +def set_credential_fetcher(scheduler: JobScheduler, fn: Callable[[int], Optional[Dict[str, Any]]]): + scheduler._credential_fetcher = fn diff --git a/Data/Engine/services/API/scheduled_jobs/management.py b/Data/Engine/services/API/scheduled_jobs/management.py index 3327aea8..66ca6518 100644 --- a/Data/Engine/services/API/scheduled_jobs/management.py +++ b/Data/Engine/services/API/scheduled_jobs/management.py @@ -19,8 +19,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any -try: # pragma: no cover - legacy module import guard - import job_scheduler as legacy_job_scheduler # type: ignore +try: # pragma: no cover - Engine-local legacy scheduler shim + from . import legacy_job_scheduler # type: ignore except Exception as exc: # pragma: no cover - runtime guard legacy_job_scheduler = None # type: ignore _SCHEDULER_IMPORT_ERROR = exc @@ -36,8 +36,8 @@ if TYPE_CHECKING: # pragma: no cover - typing aide def _raise_scheduler_import() -> None: if _SCHEDULER_IMPORT_ERROR is not None: raise RuntimeError( - "Legacy job scheduler module could not be imported; ensure Data/Server/job_scheduler.py " - "remains available during the Engine migration." + "Legacy job scheduler module could not be imported; ensure " + "Data/Engine/services/API/scheduled_jobs/legacy_job_scheduler.py remains available." ) from _SCHEDULER_IMPORT_ERROR @@ -79,4 +79,3 @@ def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None """Ensure scheduled job routes are registered via the legacy scheduler.""" ensure_scheduler(app, adapters) - diff --git a/readme.md b/readme.md index 746a1f63..1bd44ac7 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -![Borealis Logo](Data/Server/WebUI/public/Borealis_Logo_Full.png) +![Borealis Logo](Data/Engine/web-interface/public/Borealis_Logo_Full.png) Borealis is a remote management platform with a simple, visual automation layer, enabling you to leverage scripts, Ansible playbooks, and advanced nodegraph-based automation workflows. I originally created Borealis to work towards consolidating the core functionality of several standalone automation platforms in my homelab, such as TacticalRMM, Ansible AWX, SemaphoreUI, and a few others.