Added Functional Job Scheduling for Scripts

This commit is contained in:
2025-09-23 02:32:30 -06:00
parent 1b46f2eed6
commit d5c86425be
4 changed files with 347 additions and 134 deletions

View File

@@ -130,6 +130,93 @@ class JobScheduler:
# Bind routes
self._register_routes()
# ---------- Helpers for dispatching scripts ----------
def _scripts_root(self) -> str:
import os
return os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Scripts")
)
def _detect_script_type(self, filename: str) -> str:
fn = (filename or "").lower()
if fn.endswith(".yml"):
return "ansible"
if fn.endswith(".ps1"):
return "powershell"
if fn.endswith(".bat"):
return "batch"
if fn.endswith(".sh"):
return "bash"
return "unknown"
def _dispatch_script(self, hostname: str, rel_path: str, run_mode: str) -> None:
"""Emit a quick_job_run event to agents for the given script/host.
Mirrors /api/scripts/quick_run behavior for scheduled jobs.
"""
try:
scripts_root = self._scripts_root()
import os
path_norm = (rel_path or "").replace("\\", "/")
abs_path = os.path.abspath(os.path.join(scripts_root, path_norm))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
return
stype = self._detect_script_type(abs_path)
# For now, only PowerShell is supported by agents for scheduled jobs
if stype != "powershell":
return
try:
with open(abs_path, "r", encoding="utf-8", errors="replace") as fh:
content = fh.read()
except Exception:
return
# Insert into activity_history for device for parity with Quick Job
import sqlite3
now = _now_ts()
act_id = None
conn = sqlite3.connect(self.db_path)
cur = conn.cursor()
try:
cur.execute(
"""
INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr)
VALUES(?,?,?,?,?,?,?,?)
""",
(
str(hostname),
path_norm,
os.path.basename(abs_path),
stype,
now,
"Running",
"",
"",
),
)
act_id = cur.lastrowid
conn.commit()
finally:
conn.close()
payload = {
"job_id": act_id,
"target_hostname": str(hostname),
"script_type": stype,
"script_name": os.path.basename(abs_path),
"script_path": path_norm,
"script_content": content,
"run_mode": (run_mode or "system").strip().lower(),
"admin_user": "",
"admin_pass": "",
}
try:
self.socketio.emit("quick_job_run", payload)
except Exception:
pass
except Exception:
# Keep scheduler resilient
pass
# ---------- DB helpers ----------
def _conn(self):
return sqlite3.connect(self.db_path)
@@ -310,7 +397,7 @@ class JobScheduler:
pass
try:
cur.execute(
"SELECT id, schedule_type, start_ts, enabled, expiration, targets_json, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC"
"SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC"
)
jobs = cur.fetchall()
except Exception:
@@ -328,7 +415,7 @@ class JobScheduler:
five_min = 300
now_min = _now_minute()
for (job_id, schedule_type, start_ts, enabled, expiration, targets_json, created_at) in jobs:
for (job_id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, created_at) in jobs:
try:
# Targets list for this job
try:
@@ -338,6 +425,22 @@ class JobScheduler:
targets = [str(t) for t in targets if isinstance(t, (str, int))]
total_targets = len(targets)
# Determine scripts to run for this job (first-pass: all 'script' components)
try:
comps = json.loads(components_json or "[]")
except Exception:
comps = []
script_paths = []
for c in comps:
try:
if (c or {}).get("type") == "script":
p = (c.get("path") or c.get("script_path") or "").strip()
if p:
script_paths.append(p)
except Exception:
continue
run_mode = (execution_context or "system").strip().lower()
exp_seconds = _parse_expiration(expiration)
# Determine current occurrence to work on
@@ -408,7 +511,7 @@ class JobScheduler:
except Exception:
row = None
if row:
# Existing record if Running, timeout handled earlier; skip
# Existing record - if Running, timeout handled earlier; skip
conn2.close()
continue
@@ -421,6 +524,12 @@ class JobScheduler:
(job_id, host, occ, ts_now, "Running", ts_now, ts_now),
)
conn2.commit()
# Dispatch all script components for this job to the target host
for sp in script_paths:
try:
self._dispatch_script(host, sp, run_mode)
except Exception:
continue
except Exception:
pass
finally:
@@ -877,13 +986,36 @@ class JobScheduler:
@app.route("/api/scheduled_jobs/<int:job_id>/runs", methods=["DELETE"])
def api_scheduled_job_runs_clear(job_id: int):
"""Clear all historical runs for a job except the most recent occurrence.
We keep all rows that belong to the latest occurrence (by scheduled_ts)
and delete everything older. If there is no occurrence, no-op.
"""
try:
conn = self._conn()
cur = conn.cursor()
cur.execute("DELETE FROM scheduled_job_runs WHERE job_id=?", (job_id,))
# Determine latest occurrence for this job
cur.execute(
"SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?",
(job_id,)
)
row = cur.fetchone()
latest = int(row[0]) if row and row[0] is not None else None
if latest is None:
# Nothing to clear
conn.close()
return json.dumps({"status": "ok", "cleared": 0}), 200, {"Content-Type": "application/json"}
# Delete all runs for older occurrences
cur.execute(
"DELETE FROM scheduled_job_runs WHERE job_id=? AND COALESCE(scheduled_ts, 0) < ?",
(job_id, latest),
)
cleared = cur.rowcount or 0
conn.commit()
conn.close()
return json.dumps({"status": "ok"}), 200, {"Content-Type": "application/json"}
return json.dumps({"status": "ok", "cleared": int(cleared), "kept_occurrence": latest}), 200, {"Content-Type": "application/json"}
except Exception as e:
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}