import os import time import json import os import base64 import re import sqlite3 from typing import Any, Dict, List, Optional, Tuple, Callable _WINRM_USERNAME_VAR = "__borealis_winrm_username" _WINRM_PASSWORD_VAR = "__borealis_winrm_password" _WINRM_TRANSPORT_VAR = "__borealis_winrm_transport" """ Job Scheduler module for Borealis Responsibilities: - Maintain an internal clock/loop that evaluates scheduled jobs - When a job's scheduled time arrives, mark it Running, then immediately Success (placeholder) - Track simple run history in a dedicated table - Expose job CRUD endpoints and computed scheduling metadata This module registers its Flask routes against the provided app and uses socketio.start_background_task to run the scheduler loop cooperatively with eventlet. """ def _now_ts() -> int: return int(time.time()) def _env_string(value: Any) -> str: if isinstance(value, bool): return "True" if value else "False" if value is None: return "" return str(value) def _decode_base64_text(value: Any) -> Optional[str]: if not isinstance(value, str): return None stripped = value.strip() if not stripped: return "" try: cleaned = re.sub(r"\s+", "", stripped) except Exception: cleaned = stripped try: decoded = base64.b64decode(cleaned, validate=True) except Exception: return None try: return decoded.decode("utf-8") except Exception: return decoded.decode("utf-8", errors="replace") def _inject_winrm_credential( base_values: Optional[Dict[str, Any]], credential: Optional[Dict[str, Any]], ) -> Dict[str, Any]: values: Dict[str, Any] = dict(base_values or {}) if not credential: return values username = str(credential.get("username") or "") password = str(credential.get("password") or "") metadata = credential.get("metadata") if isinstance(credential.get("metadata"), dict) else {} transport = metadata.get("winrm_transport") if isinstance(metadata, dict) else None transport_str = str(transport or "ntlm").strip().lower() or "ntlm" values[_WINRM_USERNAME_VAR] = username values[_WINRM_PASSWORD_VAR] = password values[_WINRM_TRANSPORT_VAR] = transport_str return values def _decode_script_content(value: Any, encoding_hint: str = "") -> str: encoding = (encoding_hint or "").strip().lower() if isinstance(value, str): if encoding in ("base64", "b64", "base-64"): decoded = _decode_base64_text(value) if decoded is not None: return decoded.replace("\r\n", "\n") decoded = _decode_base64_text(value) if decoded is not None: return decoded.replace("\r\n", "\n") return value.replace("\r\n", "\n") return "" def _encode_script_content(script_text: Any) -> str: if not isinstance(script_text, str): if script_text is None: script_text = "" else: script_text = str(script_text) normalized = script_text.replace("\r\n", "\n") if not normalized: return "" encoded = base64.b64encode(normalized.encode("utf-8")) return encoded.decode("ascii") def _canonical_env_key(name: Any) -> str: try: return re.sub(r"[^A-Za-z0-9_]", "_", str(name or "").strip()).upper() except Exception: return "" def _expand_env_aliases(env_map: Dict[str, str], variables: List[Dict[str, Any]]) -> Dict[str, str]: expanded: Dict[str, str] = dict(env_map or {}) if not isinstance(variables, list): return expanded for var in variables: if not isinstance(var, dict): continue name = str(var.get("name") or "").strip() if not name: continue canonical = _canonical_env_key(name) if not canonical or canonical not in expanded: continue value = expanded[canonical] alias = re.sub(r"[^A-Za-z0-9_]", "_", name) if alias and alias not in expanded: expanded[alias] = value if alias != name and re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name) and name not in expanded: expanded[name] = value return expanded def _powershell_literal(value: Any, var_type: str) -> str: typ = str(var_type or "string").lower() if typ == "boolean": if isinstance(value, bool): truthy = value elif value is None: truthy = False elif isinstance(value, (int, float)): truthy = value != 0 else: s = str(value).strip().lower() if s in {"true", "1", "yes", "y", "on"}: truthy = True elif s in {"false", "0", "no", "n", "off", ""}: truthy = False else: truthy = bool(s) return "$true" if truthy else "$false" if typ == "number": if value is None or value == "": return "0" return str(value) s = "" if value is None else str(value) return "'" + s.replace("'", "''") + "'" def _extract_variable_default(var: Dict[str, Any]) -> Any: for key in ("value", "default", "defaultValue", "default_value"): if key in var: val = var.get(key) return "" if val is None else val return "" def _prepare_variable_context(doc_variables: List[Dict[str, Any]], overrides: Dict[str, Any]): env_map: Dict[str, str] = {} variables: List[Dict[str, Any]] = [] literal_lookup: Dict[str, str] = {} doc_names: Dict[str, bool] = {} overrides = overrides or {} if not isinstance(doc_variables, list): doc_variables = [] for var in doc_variables: if not isinstance(var, dict): continue name = str(var.get("name") or "").strip() if not name: continue doc_names[name] = True canonical = _canonical_env_key(name) var_type = str(var.get("type") or "string").lower() default_val = _extract_variable_default(var) final_val = overrides[name] if name in overrides else default_val if canonical: env_map[canonical] = _env_string(final_val) literal_lookup[canonical] = _powershell_literal(final_val, var_type) if name in overrides: new_var = dict(var) new_var["value"] = overrides[name] variables.append(new_var) else: variables.append(var) for name, val in overrides.items(): if name in doc_names: continue canonical = _canonical_env_key(name) if canonical: env_map[canonical] = _env_string(val) literal_lookup[canonical] = _powershell_literal(val, "string") variables.append({"name": name, "value": val, "type": "string"}) env_map = _expand_env_aliases(env_map, variables) return env_map, variables, literal_lookup _ENV_VAR_PATTERN = re.compile(r"(?i)\$env:(\{)?([A-Za-z0-9_\-]+)(?(1)\})") def _rewrite_powershell_script(content: str, literal_lookup: Dict[str, str]) -> str: if not content or not literal_lookup: return content def _replace(match: Any) -> str: name = match.group(2) canonical = _canonical_env_key(name) if not canonical: return match.group(0) literal = literal_lookup.get(canonical) if literal is None: return match.group(0) return literal return _ENV_VAR_PATTERN.sub(_replace, content) def _parse_ts(val: Any) -> Optional[int]: """Best effort to parse ISO-ish datetime string or numeric seconds to epoch seconds.""" if val is None: return None if isinstance(val, (int, float)): return int(val) try: from datetime import datetime s = str(val).strip().replace("Z", "+00:00") dt = datetime.fromisoformat(s) return int(dt.timestamp()) except Exception: return None def _parse_expiration(s: Optional[str]) -> Optional[int]: """Parse expiration shorthand to seconds. Examples: '30m' -> 1800, '1h' -> 3600, '2d' -> 172800. Returns None for 'no_expire' or invalid input. """ if not s or s == "no_expire": return None try: s = s.strip().lower() unit = s[-1] value = int(s[:-1]) if unit == 'm': return value * 60 if unit == 'h': return value * 3600 if unit == 'd': return value * 86400 # Fallback: treat as minutes if only a number return int(s) * 60 except Exception: return None def _floor_minute(ts: int) -> int: ts = int(ts or 0) return ts - (ts % 60) def _now_minute() -> int: return _floor_minute(_now_ts()) def _add_months(dt_tuple: Tuple[int, int, int, int, int, int], months: int = 1) -> int: """Advance a date by N months and return epoch seconds. dt_tuple = (year, month, day, hour, minute, second) Handles month-end clamping. """ from calendar import monthrange from datetime import datetime y, m, d, hh, mm, ss = dt_tuple m2 = m + months y += (m2 - 1) // 12 m2 = ((m2 - 1) % 12) + 1 # Clamp day to last day of new month last_day = monthrange(y, m2)[1] d = min(d, last_day) try: return int(datetime(y, m2, d, hh, mm, ss).timestamp()) except Exception: # Fallback to first of month if something odd return int(datetime(y, m2, 1, hh, mm, ss).timestamp()) def _add_years(dt_tuple: Tuple[int, int, int, int, int, int], years: int = 1) -> int: from datetime import datetime y, m, d, hh, mm, ss = dt_tuple y += years # Handle Feb 29 -> Feb 28 if needed try: return int(datetime(y, m, d, hh, mm, ss).timestamp()) except Exception: # clamp day to 28 d2 = 28 if (m == 2 and d > 28) else 1 return int(datetime(y, m, d2, hh, mm, ss).timestamp()) def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]: from datetime import datetime dt = datetime.utcfromtimestamp(int(ts)) return (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) class JobScheduler: def __init__(self, app, socketio, db_path: str): self.app = app self.socketio = socketio self.db_path = db_path self._running = False # Simulated run duration to hold jobs in "Running" before Success self.SIMULATED_RUN_SECONDS = int(os.environ.get("BOREALIS_SIM_RUN_SECONDS", "30")) # Retention for run history (days) self.RETENTION_DAYS = int(os.environ.get("BOREALIS_JOB_HISTORY_DAYS", "30")) # Callback to retrieve current set of online hostnames self._online_lookup: Optional[Callable[[], List[str]]] = None # Optional callback to execute Ansible directly from the server self._server_ansible_runner: Optional[Callable[..., str]] = None # Optional callback to fetch stored credentials (with decrypted secrets) self._credential_fetcher: Optional[Callable[[int], Optional[Dict[str, Any]]]] = None # Ensure run-history table exists self._init_tables() # Bind routes self._register_routes() # ---------- Helpers for dispatching scripts ---------- def _scripts_root(self) -> str: import os # Unified Assemblies root; script paths should include top-level # folder such as "Scripts" or "Ansible Playbooks". return os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies") ) def _is_valid_scripts_relpath(self, rel_path: str) -> bool: try: p = (rel_path or "").replace("\\", "/").lstrip("/") if not p: return False top = p.split("/", 1)[0] return top in ("Scripts",) except Exception: return False def _detect_script_type(self, filename: str) -> str: fn_lower = (filename or "").lower() if fn_lower.endswith(".json") and os.path.isfile(filename): try: with open(filename, "r", encoding="utf-8") as fh: data = json.load(fh) if isinstance(data, dict): typ = str(data.get("type") or data.get("script_type") or "").strip().lower() if typ in ("powershell", "batch", "bash", "ansible"): return typ except Exception: pass return "powershell" if fn_lower.endswith(".yml"): return "ansible" if fn_lower.endswith(".ps1"): return "powershell" if fn_lower.endswith(".bat"): return "batch" if fn_lower.endswith(".sh"): return "bash" return "unknown" def _load_assembly_document(self, abs_path: str, default_type: str) -> Dict[str, Any]: base_name = os.path.splitext(os.path.basename(abs_path))[0] doc: Dict[str, Any] = { "name": base_name, "description": "", "category": "application" if default_type == "ansible" else "script", "type": default_type, "script": "", "variables": [], "files": [], "timeout_seconds": 3600, } if abs_path.lower().endswith(".json") and os.path.isfile(abs_path): try: with open(abs_path, "r", encoding="utf-8") as fh: data = json.load(fh) except Exception: data = {} if isinstance(data, dict): doc["name"] = str(data.get("name") or doc["name"]) doc["description"] = str(data.get("description") or "") cat = str(data.get("category") or doc["category"]).strip().lower() if cat in ("application", "script"): doc["category"] = cat typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() if typ in ("powershell", "batch", "bash", "ansible"): doc["type"] = typ script_val = data.get("script") content_val = data.get("content") script_lines = data.get("script_lines") if isinstance(script_lines, list): try: doc["script"] = "\n".join(str(line) for line in script_lines) except Exception: doc["script"] = "" elif isinstance(script_val, str): doc["script"] = script_val else: if isinstance(content_val, str): doc["script"] = content_val encoding_hint = str(data.get("script_encoding") or data.get("scriptEncoding") or "").strip().lower() doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) if encoding_hint in ("base64", "b64", "base-64"): doc["script_encoding"] = "base64" else: probe_source = "" if isinstance(script_val, str) and script_val: probe_source = script_val elif isinstance(content_val, str) and content_val: probe_source = content_val decoded_probe = _decode_base64_text(probe_source) if probe_source else None if decoded_probe is not None: doc["script_encoding"] = "base64" doc["script"] = decoded_probe.replace("\r\n", "\n") else: doc["script_encoding"] = "plain" try: timeout_raw = data.get("timeout_seconds", data.get("timeout")) if timeout_raw is None: doc["timeout_seconds"] = 3600 else: doc["timeout_seconds"] = max(0, int(timeout_raw)) except Exception: doc["timeout_seconds"] = 3600 vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] doc["variables"] = [] for v in vars_in: if not isinstance(v, dict): continue name = str(v.get("name") or v.get("key") or "").strip() if not name: continue vtype = str(v.get("type") or "string").strip().lower() if vtype not in ("string", "number", "boolean", "credential"): vtype = "string" doc["variables"].append({ "name": name, "label": str(v.get("label") or ""), "type": vtype, "default": v.get("default", v.get("default_value")), "required": bool(v.get("required")), "description": str(v.get("description") or ""), }) files_in = data.get("files") if isinstance(data.get("files"), list) else [] doc["files"] = [] for f in files_in: if not isinstance(f, dict): continue fname = f.get("file_name") or f.get("name") if not fname or not isinstance(f.get("data"), str): continue try: size_val = int(f.get("size") or 0) except Exception: size_val = 0 doc["files"].append({ "file_name": str(fname), "size": size_val, "mime_type": str(f.get("mime_type") or f.get("mimeType") or ""), "data": f.get("data"), }) return doc try: with open(abs_path, "r", encoding="utf-8", errors="replace") as fh: content = fh.read() except Exception: content = "" normalized_script = (content or "").replace("\r\n", "\n") doc["script"] = normalized_script return doc def _ansible_root(self) -> str: import os return os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Ansible_Playbooks") ) def _dispatch_ansible( self, hostname: str, component: Dict[str, Any], scheduled_job_id: int, scheduled_run_row_id: int, run_mode: str, credential_id: Optional[int] = None, use_service_account: bool = False, ) -> Optional[Dict[str, Any]]: try: import os, uuid ans_root = self._ansible_root() rel_path = "" overrides_map: Dict[str, Any] = {} if isinstance(component, dict): rel_path = component.get("path") or component.get("playbook_path") or component.get("script_path") or "" raw_overrides = component.get("variable_values") if isinstance(raw_overrides, dict): for key, val in raw_overrides.items(): name = str(key or "").strip() if not name: continue overrides_map[name] = val comp_vars = component.get("variables") if isinstance(comp_vars, list): for var in comp_vars: if not isinstance(var, dict): continue name = str(var.get("name") or "").strip() if not name or name in overrides_map: continue if "value" in var: overrides_map[name] = var.get("value") else: rel_path = str(component or "") rel_norm = (rel_path or "").replace("\\", "/").lstrip("/") abs_path = os.path.abspath(os.path.join(ans_root, rel_norm)) if (not abs_path.startswith(ans_root)) or (not os.path.isfile(abs_path)): return None doc = self._load_assembly_document(abs_path, "ansible") content = doc.get("script") or "" encoded_content = _encode_script_content(content) variables = doc.get("variables") or [] files = doc.get("files") or [] run_mode_norm = (run_mode or "system").strip().lower() server_run = run_mode_norm == "ssh" agent_winrm = run_mode_norm == "winrm" if agent_winrm and not use_service_account: if not credential_id: raise RuntimeError("WinRM execution requires a credential_id") if not callable(self._credential_fetcher): raise RuntimeError("Credential fetcher is not configured") cred_detail = self._credential_fetcher(int(credential_id)) if not cred_detail: raise RuntimeError("Credential not found") try: overrides_map = _inject_winrm_credential(overrides_map, cred_detail) finally: try: cred_detail.clear() # type: ignore[attr-defined] except Exception: pass # Record in activity_history for UI parity now = _now_ts() act_id = None conn = self._conn() cur = conn.cursor() try: cur.execute( """ INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) VALUES(?,?,?,?,?,?,?,?) """, ( str(hostname), rel_norm, doc.get("name") or os.path.basename(abs_path), "ansible", now, "Running", "", "", ), ) act_id = cur.lastrowid conn.commit() finally: conn.close() if server_run: if not credential_id: raise RuntimeError("Remote execution requires a credential_id") if not callable(self._server_ansible_runner): raise RuntimeError("Server-side Ansible runner is not configured") try: self._server_ansible_runner( hostname=str(hostname), playbook_abs_path=abs_path, playbook_rel_path=rel_norm, playbook_name=doc.get("name") or os.path.basename(abs_path), credential_id=int(credential_id), variable_values=overrides_map, source="scheduled_job", activity_id=act_id, scheduled_job_id=scheduled_job_id, scheduled_run_id=scheduled_run_row_id, scheduled_job_run_row_id=scheduled_run_row_id, ) except Exception as exc: try: self.app.logger.warning( "[Scheduler] Server-side Ansible queue failed job=%s run=%s host=%s err=%s", scheduled_job_id, scheduled_run_row_id, hostname, exc, ) except Exception: print(f"[Scheduler] Server-side Ansible queue failed job={scheduled_job_id} host={hostname} err={exc}") if act_id: try: conn_fail = self._conn() cur_fail = conn_fail.cursor() cur_fail.execute( "UPDATE activity_history SET status='Failed', stderr=?, ran_at=? WHERE id=?", (str(exc), _now_ts(), act_id), ) conn_fail.commit() conn_fail.close() except Exception: pass raise else: payload = { "run_id": uuid.uuid4().hex, "target_hostname": str(hostname), "playbook_name": doc.get("name") or os.path.basename(abs_path), "playbook_content": encoded_content, "playbook_encoding": "base64", "activity_job_id": act_id, "scheduled_job_id": int(scheduled_job_id), "scheduled_run_id": int(scheduled_run_row_id), "connection": "winrm", "variables": variables, "files": files, "variable_values": overrides_map, } try: self.socketio.emit("ansible_playbook_run", payload) except Exception: pass if act_id: return { "activity_id": int(act_id), "component_name": doc.get("name") or os.path.basename(abs_path), "component_path": rel_norm, "script_type": "ansible", "component_kind": "ansible", } return None except Exception: pass def _dispatch_script(self, hostname: str, component: Dict[str, Any], run_mode: str) -> Optional[Dict[str, Any]]: """Emit a quick_job_run event to agents for the given script/host. Mirrors /api/scripts/quick_run behavior for scheduled jobs. """ try: scripts_root = self._scripts_root() import os rel_path_raw = "" if isinstance(component, dict): rel_path_raw = str(component.get("path") or component.get("script_path") or "") else: rel_path_raw = str(component or "") path_norm = (rel_path_raw or "").replace("\\", "/").strip() if path_norm and not path_norm.startswith("Scripts/"): path_norm = f"Scripts/{path_norm}" abs_path = os.path.abspath(os.path.join(scripts_root, path_norm)) if (not abs_path.startswith(scripts_root)) or (not self._is_valid_scripts_relpath(path_norm)) or (not os.path.isfile(abs_path)): return None doc = self._load_assembly_document(abs_path, "powershell") stype = (doc.get("type") or "powershell").lower() # For now, only PowerShell is supported by agents for scheduled jobs if stype != "powershell": return None content = doc.get("script") or "" doc_variables = doc.get("variables") if isinstance(doc.get("variables"), list) else [] overrides: Dict[str, Any] = {} if isinstance(component, dict): if isinstance(component.get("variable_values"), dict): for key, val in component.get("variable_values").items(): name = str(key or "").strip() if name: overrides[name] = val if isinstance(component.get("variables"), list): for var in component.get("variables"): if not isinstance(var, dict): continue name = str(var.get("name") or "").strip() if not name: continue if "value" in var: overrides[name] = var.get("value") env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides) content = _rewrite_powershell_script(content, literal_lookup) encoded_content = _encode_script_content(content) timeout_seconds = 0 try: timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0)) except Exception: timeout_seconds = 0 # Insert into activity_history for device for parity with Quick Job import sqlite3 now = _now_ts() act_id = None conn = sqlite3.connect(self.db_path) cur = conn.cursor() try: cur.execute( """ INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) VALUES(?,?,?,?,?,?,?,?) """, ( str(hostname), path_norm, doc.get("name") or os.path.basename(abs_path), stype, now, "Running", "", "", ), ) act_id = cur.lastrowid conn.commit() finally: conn.close() payload = { "job_id": act_id, "target_hostname": str(hostname), "script_type": stype, "script_name": doc.get("name") or os.path.basename(abs_path), "script_path": path_norm, "script_content": encoded_content, "script_encoding": "base64", "environment": env_map, "variables": variables, "timeout_seconds": timeout_seconds, "files": doc.get("files") or [], "run_mode": (run_mode or "system").strip().lower(), "admin_user": "", "admin_pass": "", } try: self.socketio.emit("quick_job_run", payload) except Exception: pass if act_id: return { "activity_id": int(act_id), "component_name": doc.get("name") or os.path.basename(abs_path), "component_path": path_norm, "script_type": stype, "component_kind": "script", } return None except Exception: # Keep scheduler resilient pass return None # ---------- DB helpers ---------- def _conn(self): return sqlite3.connect(self.db_path) def set_credential_fetcher(self, fn: Optional[Callable[[int], Optional[Dict[str, Any]]]]): self._credential_fetcher = fn def _init_tables(self): conn = self._conn() cur = conn.cursor() # Runs table captures each firing cur.execute( """ CREATE TABLE IF NOT EXISTS scheduled_job_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER NOT NULL, scheduled_ts INTEGER, started_ts INTEGER, finished_ts INTEGER, status TEXT, error TEXT, created_at INTEGER, updated_at INTEGER, FOREIGN KEY(job_id) REFERENCES scheduled_jobs(id) ON DELETE CASCADE ) """ ) # Add per-target column if missing try: cur.execute("PRAGMA table_info(scheduled_job_runs)") cols = {row[1] for row in cur.fetchall()} if 'target_hostname' not in cols: cur.execute("ALTER TABLE scheduled_job_runs ADD COLUMN target_hostname TEXT") except Exception: pass # Helpful index for lookups try: cur.execute("CREATE INDEX IF NOT EXISTS idx_runs_job_sched_target ON scheduled_job_runs(job_id, scheduled_ts, target_hostname)") except Exception: pass try: cur.execute( """ CREATE TABLE IF NOT EXISTS scheduled_job_run_activity ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL, activity_id INTEGER NOT NULL, component_kind TEXT, script_type TEXT, component_path TEXT, component_name TEXT, created_at INTEGER, FOREIGN KEY(run_id) REFERENCES scheduled_job_runs(id) ON DELETE CASCADE, FOREIGN KEY(activity_id) REFERENCES activity_history(id) ON DELETE CASCADE ) """ ) cur.execute("CREATE INDEX IF NOT EXISTS idx_run_activity_run ON scheduled_job_run_activity(run_id)") cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_run_activity_activity ON scheduled_job_run_activity(activity_id)") except Exception: pass conn.commit() conn.close() # ---------- Scheduling core ---------- def _get_last_run(self, job_id: int) -> Optional[Dict[str, Any]]: conn = self._conn() cur = conn.cursor() cur.execute( "SELECT id, scheduled_ts, started_ts, finished_ts, status FROM scheduled_job_runs WHERE job_id=? ORDER BY COALESCE(started_ts, scheduled_ts, 0) DESC, id DESC LIMIT 1", (job_id,) ) row = cur.fetchone() conn.close() if not row: return None return { "id": row[0], "scheduled_ts": row[1], "started_ts": row[2], "finished_ts": row[3], "status": row[4] or "", } def _compute_next_run(self, schedule_type: str, start_ts: Optional[int], last_run_ts: Optional[int], now_ts: int) -> Optional[int]: st = (schedule_type or "immediately").strip().lower() start_ts = _floor_minute(start_ts) if start_ts else None last_run_ts = _floor_minute(last_run_ts) if last_run_ts else None now_ts = _floor_minute(now_ts) if st == "immediately": # Run once asap if never ran return None if last_run_ts else now_ts if st == "once": if not start_ts: return None # If never ran and time in future/now if not last_run_ts: return start_ts return None if not start_ts: return None # For recurring types, base off start_ts and last_run_ts last = last_run_ts if last_run_ts else None # Minute/Hour intervals if st in ("every_5_minutes", "every_10_minutes", "every_15_minutes", "every_30_minutes", "every_hour"): period_map = { "every_5_minutes": 5 * 60, "every_10_minutes": 10 * 60, "every_15_minutes": 15 * 60, "every_30_minutes": 30 * 60, "every_hour": 60 * 60, } period = period_map.get(st) candidate = (last + period) if last else start_ts while candidate is not None and candidate <= now_ts - 1: candidate += period return candidate if st == "daily": period = 86400 candidate = last + period if last else start_ts while candidate is not None and candidate <= now_ts - 1: candidate += period return candidate if st == "weekly": period = 7 * 86400 candidate = last + period if last else start_ts while candidate is not None and candidate <= now_ts - 1: candidate += period return candidate if st == "monthly": base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) candidate = _add_months(base, 1 if last else 0) while candidate <= now_ts - 1: base = _to_dt_tuple(candidate) candidate = _add_months(base, 1) return candidate if st == "yearly": base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) candidate = _add_years(base, 1 if last else 0) while candidate <= now_ts - 1: base = _to_dt_tuple(candidate) candidate = _add_years(base, 1) return candidate return None def _should_expire(self, started_ts: Optional[int], expiration: Optional[str], now_ts: int) -> bool: if not started_ts: return False seconds = _parse_expiration(expiration) if not seconds: return False return (started_ts + seconds) <= now_ts def _tick_once(self): """Evaluate all enabled scheduled jobs and trigger those due. Placeholder execution: mark Running then Success immediately. """ now = _now_ts() conn = self._conn() cur = conn.cursor() # First, mark any stale Running runs that exceeded job expiration as Timed Out try: cur.execute( """ SELECT r.id, r.started_ts, j.expiration FROM scheduled_job_runs r JOIN scheduled_jobs j ON j.id = r.job_id WHERE r.status = 'Running' """ ) rows = cur.fetchall() for rid, started_ts, expiration in rows: if self._should_expire(started_ts, expiration, now): try: c2 = conn.cursor() c2.execute( "UPDATE scheduled_job_runs SET status='Timed Out', updated_at=? WHERE id=?", (now, rid), ) conn.commit() except Exception: pass except Exception: pass # Next, finalize any Running runs that have passed the simulated duration try: cur.execute( "SELECT id, started_ts FROM scheduled_job_runs WHERE status='Running'" ) rows = cur.fetchall() for rid, started_ts in rows: if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now: try: c2 = conn.cursor() c2.execute( "UPDATE scheduled_job_runs SET finished_ts=?, status='Success', updated_at=? WHERE id=?", (now, now, rid), ) conn.commit() except Exception: pass except Exception: pass # Finally, rotate history older than the retention window try: cutoff = now - (self.RETENTION_DAYS * 86400) cur.execute( "DELETE FROM scheduled_job_runs WHERE COALESCE(finished_ts, started_ts, scheduled_ts, 0) < ?", (cutoff,) ) conn.commit() except Exception: pass try: cur.execute( "SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, use_service_account, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC" ) jobs = cur.fetchall() except Exception: jobs = [] conn.close() # Online hostnames snapshot for this tick online = set() try: if callable(self._online_lookup): online = set(self._online_lookup() or []) except Exception: online = set() five_min = 300 now_min = _now_minute() for ( job_id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, use_service_account_flag, created_at, ) in jobs: try: # Targets list for this job try: targets = json.loads(targets_json or "[]") except Exception: targets = [] targets = [str(t) for t in targets if isinstance(t, (str, int))] total_targets = len(targets) # Determine scripts to run for this job (first-pass: all 'script' components) try: comps = json.loads(components_json or "[]") except Exception: comps = [] script_components = [] ansible_components = [] for c in comps: try: ctype = (c or {}).get("type") if ctype == "script": p = (c.get("path") or c.get("script_path") or "").strip() if p: comp_copy = dict(c) comp_copy["path"] = p script_components.append(comp_copy) elif ctype == "ansible": p = (c.get("path") or "").strip() if p: comp_copy = dict(c) comp_copy["path"] = p ansible_components.append(comp_copy) except Exception: continue run_mode = (execution_context or "system").strip().lower() job_credential_id = None job_use_service_account = bool(use_service_account_flag) if run_mode != "winrm": job_use_service_account = False try: job_credential_id = int(credential_id) if credential_id is not None else None except Exception: job_credential_id = None exp_seconds = _parse_expiration(expiration) # Determine current occurrence to work on occ = None try: conn2 = self._conn() c2 = conn2.cursor() c2.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,)) occ_from_runs = c2.fetchone()[0] conn2.close() except Exception: occ_from_runs = None if occ_from_runs: occ = int(occ_from_runs) # Check if occurrence is complete (terminal status per target) try: conn2 = self._conn() c2 = conn2.cursor() c2.execute( "SELECT COUNT(DISTINCT target_hostname) FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? AND status IN ('Success','Expired','Timed Out')", (job_id, occ) ) done_count = int(c2.fetchone()[0] or 0) conn2.close() except Exception: done_count = 0 if total_targets > 0 and done_count >= total_targets: # Move to next occurrence if due in window nxt = self._compute_next_run(schedule_type, start_ts, occ, now_min) if nxt is not None and now_min >= nxt and (now_min - nxt) <= five_min: occ = nxt else: # Nothing to do this tick for this job continue else: # Continue working on this occurrence regardless of the 5-minute window until expiration pass else: # No occurrence yet; derive initial occurrence if (schedule_type or '').lower() == 'immediately': occ = _floor_minute(created_at or now_min) else: st_min = _floor_minute(start_ts) if start_ts else None if st_min is None: # Try compute_next_run to derive first planned occurrence occ = self._compute_next_run(schedule_type, start_ts, None, now_min) else: occ = st_min if occ is None: continue # For first occurrence, require it be within the 5-minute window to trigger if not (now_min >= occ and (now_min - occ) <= five_min): continue # For each target, if no run exists for this occurrence, either start it (if online) or keep waiting/expire for host in targets: # Skip if a run already exists for this job/host/occurrence try: conn2 = self._conn() c2 = conn2.cursor() c2.execute( "SELECT id, status, started_ts FROM scheduled_job_runs WHERE job_id=? AND target_hostname=? AND scheduled_ts=? ORDER BY id DESC LIMIT 1", (job_id, host, occ) ) row = c2.fetchone() except Exception: row = None if row: # Existing record - if Running, timeout handled earlier; skip conn2.close() continue # Start if online; otherwise, wait until expiration if host in online: try: ts_now = _now_ts() c2.execute( "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, started_ts, status, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", (job_id, host, occ, ts_now, "Running", ts_now, ts_now), ) run_row_id = c2.lastrowid or 0 conn2.commit() activity_links: List[Dict[str, Any]] = [] remote_requires_cred = (run_mode == "ssh") or (run_mode == "winrm" and not job_use_service_account) if remote_requires_cred and not job_credential_id: err_msg = "Credential required for remote execution" c2.execute( "UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?", (ts_now, ts_now, err_msg, run_row_id), ) conn2.commit() else: # Dispatch all script components for this job to the target host for comp in script_components: try: link = self._dispatch_script(host, comp, run_mode) if link and link.get("activity_id"): activity_links.append({ "run_id": run_row_id, "activity_id": int(link["activity_id"]), "component_kind": link.get("component_kind") or "script", "script_type": link.get("script_type") or "powershell", "component_path": link.get("component_path") or "", "component_name": link.get("component_name") or "", }) except Exception: continue # Dispatch ansible playbooks for this job to the target host for comp in ansible_components: try: link = self._dispatch_ansible( host, comp, job_id, run_row_id, run_mode, job_credential_id, job_use_service_account, ) if link and link.get("activity_id"): activity_links.append({ "run_id": run_row_id, "activity_id": int(link["activity_id"]), "component_kind": link.get("component_kind") or "ansible", "script_type": link.get("script_type") or "ansible", "component_path": link.get("component_path") or "", "component_name": link.get("component_name") or "", }) except Exception as exc: try: c2.execute( "UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?", (ts_now, ts_now, str(exc)[:512], run_row_id), ) conn2.commit() except Exception: pass continue if activity_links: try: for link in activity_links: c2.execute( "INSERT OR IGNORE INTO scheduled_job_run_activity(run_id, activity_id, component_kind, script_type, component_path, component_name, created_at) VALUES (?,?,?,?,?,?,?)", ( int(link["run_id"]), int(link["activity_id"]), link.get("component_kind") or "", link.get("script_type") or "", link.get("component_path") or "", link.get("component_name") or "", ts_now, ), ) conn2.commit() except Exception: pass except Exception: pass finally: conn2.close() else: # If expired window is configured and has passed, mark this host as Expired if exp_seconds is not None and (occ + exp_seconds) <= now: try: ts_now = _now_ts() c2.execute( "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)", (job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now), ) conn2.commit() except Exception: pass finally: conn2.close() else: # keep waiting; no record created yet try: conn2.close() except Exception: pass except Exception: # Keep loop resilient continue def start(self): if self._running: return self._running = True def _loop(): # cooperative loop aligned to minutes while self._running: try: self._tick_once() except Exception: pass # Sleep until next minute boundary delay = 60 - (_now_ts() % 60) try: import eventlet eventlet.sleep(delay) except Exception: time.sleep(delay) # Use SocketIO helper so it integrates with eventlet try: self.socketio.start_background_task(_loop) except Exception: # Fallback to thread import threading threading.Thread(target=_loop, daemon=True).start() # ---------- Route registration ---------- def _register_routes(self): app = self.app # Utility: job row def _job_row_to_dict(r) -> Dict[str, Any]: base = { "id": r[0], "name": r[1], "components": json.loads(r[2] or "[]"), "targets": json.loads(r[3] or "[]"), "schedule_type": r[4] or "immediately", "start_ts": r[5], "duration_stop_enabled": bool(r[6] or 0), "expiration": r[7] or "no_expire", "execution_context": r[8] or "system", "credential_id": r[9], "use_service_account": bool(r[10] or 0), "enabled": bool(r[11] or 0), "created_at": r[12] or 0, "updated_at": r[13] or 0, } # Attach computed status summary for latest occurrence try: conn = self._conn() c = conn.cursor() c.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (base["id"],)) max_occ = c.fetchone()[0] summary_status = None last_run_ts = None result_counts = { "pending": 0, "running": 0, "success": 0, "failed": 0, "expired": 0, "timed_out": 0, "total_targets": len(base.get("targets") or []), } if max_occ: # Summarize statuses for this occurrence c.execute( "SELECT status, COUNT(*) FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? GROUP BY status", (base["id"], max_occ) ) counts = {row[0] or "": int(row[1] or 0) for row in c.fetchall()} result_counts["running"] = counts.get("Running", 0) result_counts["success"] = counts.get("Success", 0) result_counts["failed"] = counts.get("Failed", 0) result_counts["expired"] = counts.get("Expired", 0) result_counts["timed_out"] = counts.get("Timed Out", 0) computed = result_counts["running"] + result_counts["success"] + result_counts["failed"] + result_counts["expired"] + result_counts["timed_out"] pend = (result_counts["total_targets"] or 0) - computed result_counts["pending"] = pend if pend > 0 else 0 # Priority: Running > Timed Out > Expired > Success if counts.get("Running"): summary_status = "Running" elif counts.get("Timed Out"): summary_status = "Timed Out" elif counts.get("Expired"): summary_status = "Expired" elif counts.get("Success"): summary_status = "Success" last_run_ts = int(max_occ) conn.close() except Exception: summary_status = None last_run_ts = None result_counts = { "pending": len(base.get("targets") or []), "running": 0, "success": 0, "failed": 0, "expired": 0, "timed_out": 0, "total_targets": len(base.get("targets") or []), } base["last_run_ts"] = last_run_ts base["last_status"] = summary_status or ("Scheduled" if base.get("start_ts") else "") base["latest_occurrence"] = last_run_ts base["result_counts"] = result_counts try: base["next_run_ts"] = self._compute_next_run( base["schedule_type"], base.get("start_ts"), base.get("last_run_ts"), _now_ts() ) except Exception: base["next_run_ts"] = None return base @app.route("/api/scheduled_jobs", methods=["GET"]) def api_scheduled_jobs_list(): try: conn = self._conn() cur = conn.cursor() cur.execute( """ SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs ORDER BY created_at DESC """ ) rows = [ _job_row_to_dict(r) for r in cur.fetchall() ] conn.close() return json.dumps({"jobs": rows}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs", methods=["POST"]) def api_scheduled_jobs_create(): data = self._json_body() name = (data.get("name") or "").strip() components = data.get("components") or [] targets = data.get("targets") or [] schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower() start = data.get("schedule", {}).get("start") or data.get("start") or None start_ts = _parse_ts(start) if start else None duration_stop_enabled = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled"))) expiration = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire" execution_context = (data.get("execution_context") or "system").strip().lower() credential_id = data.get("credential_id") try: credential_id = int(credential_id) if credential_id is not None else None except Exception: credential_id = None use_service_account_raw = data.get("use_service_account") use_service_account = 1 if (execution_context == "winrm" and (use_service_account_raw is None or bool(use_service_account_raw))) else 0 enabled = int(bool(data.get("enabled", True))) if not name or not components or not targets: return json.dumps({"error": "name, components, targets required"}), 400, {"Content-Type": "application/json"} now = _now_ts() try: conn = self._conn() cur = conn.cursor() cur.execute( """ INSERT INTO scheduled_jobs (name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( name, json.dumps(components), json.dumps(targets), schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, now, now, ), ) job_id = cur.lastrowid conn.commit() cur.execute( """ SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=? """, (job_id,), ) row = cur.fetchone() conn.close() return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs/", methods=["GET"]) def api_scheduled_jobs_get(job_id: int): try: conn = self._conn() cur = conn.cursor() cur.execute( """ SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=? """, (job_id,), ) row = cur.fetchone() conn.close() if not row: return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs/", methods=["PUT"]) def api_scheduled_jobs_update(job_id: int): data = self._json_body() fields: Dict[str, Any] = {} if "name" in data: fields["name"] = (data.get("name") or "").strip() if "components" in data: fields["components_json"] = json.dumps(data.get("components") or []) if "targets" in data: fields["targets_json"] = json.dumps(data.get("targets") or []) if "schedule" in data or "schedule_type" in data: schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower() fields["schedule_type"] = schedule_type start = data.get("schedule", {}).get("start") or data.get("start") or None fields["start_ts"] = _parse_ts(start) if start else None if "duration" in data or "duration_stop_enabled" in data: fields["duration_stop_enabled"] = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled"))) if "expiration" in data or (data.get("duration") and "expiration" in data.get("duration")): fields["expiration"] = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire" if "execution_context" in data: exec_ctx_val = (data.get("execution_context") or "system").strip().lower() fields["execution_context"] = exec_ctx_val if exec_ctx_val != "winrm": fields["use_service_account"] = 0 if "credential_id" in data: cred_val = data.get("credential_id") if cred_val in (None, "", "null"): fields["credential_id"] = None else: try: fields["credential_id"] = int(cred_val) except Exception: fields["credential_id"] = None if "use_service_account" in data: fields["use_service_account"] = 1 if bool(data.get("use_service_account")) else 0 if "enabled" in data: fields["enabled"] = int(bool(data.get("enabled"))) if not fields: return json.dumps({"error": "no fields to update"}), 400, {"Content-Type": "application/json"} try: conn = self._conn() cur = conn.cursor() sets = ", ".join([f"{k}=?" for k in fields.keys()]) params = list(fields.values()) + [_now_ts(), job_id] cur.execute(f"UPDATE scheduled_jobs SET {sets}, updated_at=? WHERE id=?", params) if cur.rowcount == 0: conn.close() return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} conn.commit() cur.execute( """ SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=? """, (job_id,), ) row = cur.fetchone() conn.close() return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs//toggle", methods=["POST"]) def api_scheduled_jobs_toggle(job_id: int): data = self._json_body() enabled = int(bool(data.get("enabled", True))) try: conn = self._conn() cur = conn.cursor() cur.execute("UPDATE scheduled_jobs SET enabled=?, updated_at=? WHERE id=?", (enabled, _now_ts(), job_id)) if cur.rowcount == 0: conn.close() return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} conn.commit() cur.execute( "SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, use_service_account, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=?", (job_id,), ) row = cur.fetchone() conn.close() return json.dumps({"job": _job_row_to_dict(row)}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs/", methods=["DELETE"]) def api_scheduled_jobs_delete(job_id: int): try: conn = self._conn() cur = conn.cursor() cur.execute("DELETE FROM scheduled_jobs WHERE id=?", (job_id,)) deleted = cur.rowcount conn.commit() conn.close() if deleted == 0: return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} return json.dumps({"status": "ok"}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs//runs", methods=["GET"]) def api_scheduled_job_runs(job_id: int): try: from flask import request days = request.args.get('days', default=str(self.RETENTION_DAYS)) try: days = max(1, int(days)) except Exception: days = self.RETENTION_DAYS cutoff = _now_ts() - (days * 86400) conn = self._conn() cur = conn.cursor() cur.execute( """ SELECT id, scheduled_ts, started_ts, finished_ts, status, error FROM scheduled_job_runs WHERE job_id=? AND COALESCE(finished_ts, started_ts, scheduled_ts, 0) >= ? ORDER BY COALESCE(started_ts, scheduled_ts, 0) DESC, id DESC LIMIT 500 """, (job_id, cutoff) ) rows = cur.fetchall() conn.close() runs = [ { "id": r[0], "scheduled_ts": r[1], "started_ts": r[2], "finished_ts": r[3], "status": r[4] or "", "error": r[5] or "", } for r in rows ] return json.dumps({"runs": runs}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs//devices", methods=["GET"]) def api_scheduled_job_devices(job_id: int): """Return per-target status for the latest occurrence (or specified via ?occurrence=epoch).""" try: from flask import request occurrence = request.args.get('occurrence') occ = int(occurrence) if occurrence else None conn = self._conn() cur = conn.cursor() cur.execute( "SELECT targets_json FROM scheduled_jobs WHERE id=?", (job_id,) ) row = cur.fetchone() if not row: conn.close() return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"} try: targets = json.loads(row[0] or "[]") except Exception: targets = [] targets = [str(t) for t in targets if isinstance(t, (str, int))] # Determine occurrence if not provided if occ is None: cur.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,)) occ_row = cur.fetchone() occ = int(occ_row[0]) if occ_row and occ_row[0] else None # Sites lookup site_by_host: Dict[str, str] = {} try: cur.execute( "SELECT device_hostname, sites.name FROM device_sites JOIN sites ON sites.id = device_sites.site_id" ) for h, n in cur.fetchall(): site_by_host[str(h)] = n or "" except Exception: pass # Status per target for occurrence run_by_host: Dict[str, Dict[str, Any]] = {} run_ids: List[int] = [] if occ is not None: try: cur.execute( "SELECT id, target_hostname, status, started_ts, finished_ts FROM scheduled_job_runs WHERE job_id=? AND scheduled_ts=? ORDER BY id DESC", (job_id, occ) ) rows = cur.fetchall() for rid, h, st, st_ts, fin_ts in rows: h = str(h) if h not in run_by_host: run_by_host[h] = { "status": st or "", "started_ts": st_ts, "finished_ts": fin_ts, "run_id": int(rid), } run_ids.append(int(rid)) except Exception: pass activities_by_run: Dict[int, List[Dict[str, Any]]] = {} if run_ids: try: placeholders = ",".join(["?"] * len(run_ids)) cur.execute( f""" SELECT s.run_id, s.activity_id, s.component_kind, s.script_type, s.component_path, s.component_name, COALESCE(LENGTH(h.stdout), 0), COALESCE(LENGTH(h.stderr), 0) FROM scheduled_job_run_activity s LEFT JOIN activity_history h ON h.id = s.activity_id WHERE s.run_id IN ({placeholders}) """, run_ids, ) for rid, act_id, kind, stype, path, name, so_len, se_len in cur.fetchall(): rid = int(rid) entry = { "activity_id": int(act_id), "component_kind": kind or "", "script_type": stype or "", "component_path": path or "", "component_name": name or "", "has_stdout": bool(so_len), "has_stderr": bool(se_len), } activities_by_run.setdefault(rid, []).append(entry) except Exception: pass conn.close() # Online snapshot online = set() try: if callable(self._online_lookup): online = set(self._online_lookup() or []) except Exception: online = set() out = [] for host in targets: rec = run_by_host.get(str(host), {}) job_status = rec.get("status") or "Pending" ran_on = rec.get("started_ts") or rec.get("finished_ts") activities = activities_by_run.get(rec.get("run_id", 0) or 0, []) has_stdout = any(a.get("has_stdout") for a in activities) has_stderr = any(a.get("has_stderr") for a in activities) out.append({ "hostname": str(host), "online": str(host) in online, "site": site_by_host.get(str(host), ""), "ran_on": ran_on, "job_status": job_status, "has_stdout": has_stdout, "has_stderr": has_stderr, "activities": activities, }) return json.dumps({"occurrence": occ, "devices": out}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} @app.route("/api/scheduled_jobs//runs", methods=["DELETE"]) def api_scheduled_job_runs_clear(job_id: int): """Clear all historical runs for a job except the most recent occurrence. We keep all rows that belong to the latest occurrence (by scheduled_ts) and delete everything older. If there is no occurrence, no-op. """ try: conn = self._conn() cur = conn.cursor() # Determine latest occurrence for this job cur.execute( "SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,) ) row = cur.fetchone() latest = int(row[0]) if row and row[0] is not None else None if latest is None: # Nothing to clear conn.close() return json.dumps({"status": "ok", "cleared": 0}), 200, {"Content-Type": "application/json"} # Delete all runs for older occurrences cur.execute( "DELETE FROM scheduled_job_runs WHERE job_id=? AND COALESCE(scheduled_ts, 0) < ?", (job_id, latest), ) cleared = cur.rowcount or 0 conn.commit() conn.close() return json.dumps({"status": "ok", "cleared": int(cleared), "kept_occurrence": latest}), 200, {"Content-Type": "application/json"} except Exception as e: return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} # ---------- Request helpers ---------- def _json_body(self) -> Dict[str, Any]: try: from flask import request return request.get_json(silent=True) or {} except Exception: return {} def register(app, socketio, db_path: str) -> JobScheduler: """Factory to create and return a JobScheduler instance.""" return JobScheduler(app, socketio, db_path) def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]): scheduler._online_lookup = fn def set_server_ansible_runner(scheduler: JobScheduler, fn: Callable[..., str]): scheduler._server_ansible_runner = fn def set_credential_fetcher(scheduler: JobScheduler, fn: Callable[[int], Optional[Dict[str, Any]]]): scheduler._credential_fetcher = fn