Added Job Scheduler Logic

This commit is contained in:
2025-09-23 00:02:39 -06:00
parent a81883ea04
commit 836b5783db
5 changed files with 1236 additions and 254 deletions

View File

@@ -20,6 +20,8 @@ import io
# Borealis Python API Endpoints
from Python_API_Endpoints.ocr_engines import run_ocr_on_base64
from Python_API_Endpoints.script_engines import run_powershell_script
from job_scheduler import register as register_job_scheduler
from job_scheduler import set_online_lookup as scheduler_set_online_lookup
# ---------------------------------------------
# Flask + WebSocket Server Configuration
@@ -1039,6 +1041,29 @@ def ensure_default_admin():
ensure_default_admin()
# ---------------------------------------------
# Scheduler Registration
# ---------------------------------------------
job_scheduler = register_job_scheduler(app, socketio, DB_PATH)
job_scheduler.start()
# Provide scheduler with online device lookup based on registered agents
def _online_hostnames_snapshot():
# Consider agent online if we saw collector activity within last 5 minutes
try:
now = time.time()
out = []
for rec in (registered_agents or {}).values():
host = rec.get('hostname')
last = float(rec.get('collector_active_ts') or 0)
if host and (now - last) <= 300:
out.append(str(host))
return out
except Exception:
return []
scheduler_set_online_lookup(job_scheduler, _online_hostnames_snapshot)
# ---------------------------------------------
# Sites API
# ---------------------------------------------
@@ -1485,236 +1510,10 @@ def get_agents():
return jsonify(out)
# ---------------------------------------------
# Scheduled Jobs API (basic CRUD/persistence only)
# ---------------------------------------------
def _job_row_to_dict(r):
return {
"id": r[0],
"name": r[1],
"components": json.loads(r[2] or "[]"),
"targets": json.loads(r[3] or "[]"),
"schedule_type": r[4] or "immediately",
"start_ts": r[5],
"duration_stop_enabled": bool(r[6] or 0),
"expiration": r[7] or "no_expire",
"execution_context": r[8] or "system",
"enabled": bool(r[9] or 0),
"created_at": r[10] or 0,
"updated_at": r[11] or 0,
}
"""Scheduled Jobs API moved to Data/Server/job_scheduler.py"""
@app.route("/api/scheduled_jobs", methods=["GET"]) # list
def api_scheduled_jobs_list():
try:
conn = _db_conn()
cur = conn.cursor()
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
FROM scheduled_jobs
ORDER BY created_at DESC
"""
)
rows = [ _job_row_to_dict(r) for r in cur.fetchall() ]
conn.close()
return jsonify({"jobs": rows})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled_jobs", methods=["POST"]) # create
def api_scheduled_jobs_create():
data = request.get_json(silent=True) or {}
name = (data.get("name") or "").strip()
components = data.get("components") or []
targets = data.get("targets") or []
schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower()
start = data.get("schedule", {}).get("start") or data.get("start") or None
try:
start_ts = int(dayjs_to_ts(start)) if start else None
except Exception:
start_ts = None
duration_stop_enabled = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled")))
expiration = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire"
execution_context = (data.get("execution_context") or "system").strip().lower()
enabled = int(bool(data.get("enabled", True)))
if not name or not components or not targets:
return jsonify({"error": "name, components, targets required"}), 400
now = _now_ts()
try:
conn = _db_conn()
cur = conn.cursor()
cur.execute(
"""
INSERT INTO scheduled_jobs
(name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
(
name,
json.dumps(components),
json.dumps(targets),
schedule_type,
start_ts,
duration_stop_enabled,
expiration,
execution_context,
enabled,
now,
now,
),
)
job_id = cur.lastrowid
conn.commit()
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
)
row = cur.fetchone()
conn.close()
return jsonify({"job": _job_row_to_dict(row)})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled_jobs/<int:job_id>", methods=["GET"]) # get
def api_scheduled_jobs_get(job_id: int):
try:
conn = _db_conn()
cur = conn.cursor()
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
)
row = cur.fetchone()
conn.close()
if not row:
return jsonify({"error": "not found"}), 404
return jsonify({"job": _job_row_to_dict(row)})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled_jobs/<int:job_id>", methods=["PUT"]) # update
def api_scheduled_jobs_update(job_id: int):
data = request.get_json(silent=True) or {}
fields = {}
if "name" in data:
fields["name"] = (data.get("name") or "").strip()
if "components" in data:
fields["components_json"] = json.dumps(data.get("components") or [])
if "targets" in data:
fields["targets_json"] = json.dumps(data.get("targets") or [])
if "schedule" in data or "schedule_type" in data:
schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower()
fields["schedule_type"] = schedule_type
start = data.get("schedule", {}).get("start") or data.get("start") or None
try:
fields["start_ts"] = int(dayjs_to_ts(start)) if start else None
except Exception:
fields["start_ts"] = None
if "duration" in data or "duration_stop_enabled" in data:
fields["duration_stop_enabled"] = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled")))
if "expiration" in data or (data.get("duration") and "expiration" in data.get("duration")):
fields["expiration"] = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire"
if "execution_context" in data:
fields["execution_context"] = (data.get("execution_context") or "system").strip().lower()
if "enabled" in data:
fields["enabled"] = int(bool(data.get("enabled")))
if not fields:
return jsonify({"error": "no fields to update"}), 400
try:
conn = _db_conn()
cur = conn.cursor()
sets = ", ".join([f"{k}=?" for k in fields.keys()])
params = list(fields.values()) + [_now_ts(), job_id]
cur.execute(f"UPDATE scheduled_jobs SET {sets}, updated_at=? WHERE id=?", params)
if cur.rowcount == 0:
conn.close()
return jsonify({"error": "not found"}), 404
conn.commit()
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
)
row = cur.fetchone()
conn.close()
return jsonify({"job": _job_row_to_dict(row)})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled_jobs/<int:job_id>/toggle", methods=["POST"]) # toggle enabled
def api_scheduled_jobs_toggle(job_id: int):
data = request.get_json(silent=True) or {}
enabled = int(bool(data.get("enabled", True)))
try:
conn = _db_conn()
cur = conn.cursor()
cur.execute("UPDATE scheduled_jobs SET enabled=?, updated_at=? WHERE id=?", (enabled, _now_ts(), job_id))
if cur.rowcount == 0:
conn.close()
return jsonify({"error": "not found"}), 404
conn.commit()
cur.execute(
"SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=?",
(job_id,),
)
row = cur.fetchone()
conn.close()
return jsonify({"job": _job_row_to_dict(row)})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled_jobs/<int:job_id>", methods=["DELETE"]) # delete
def api_scheduled_jobs_delete(job_id: int):
try:
conn = _db_conn()
cur = conn.cursor()
cur.execute("DELETE FROM scheduled_jobs WHERE id=?", (job_id,))
deleted = cur.rowcount
conn.commit()
conn.close()
if deleted == 0:
return jsonify({"error": "not found"}), 404
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
def dayjs_to_ts(val):
"""Convert various ISO-ish datetime strings to epoch seconds."""
if val is None:
return None
if isinstance(val, (int, float)):
# assume seconds
return int(val)
try:
# Val may be ISO string; let Python parse
from datetime import datetime
# Ensure Z stripped or present
s = str(val).replace("Z", "+00:00")
dt = datetime.fromisoformat(s)
return int(dt.timestamp())
except Exception:
return None
## dayjs_to_ts removed; scheduling parsing now lives in job_scheduler
@app.route("/api/agent/details", methods=["POST"])