mirror of
https://github.com/bunny-lab-io/Borealis.git
synced 2025-12-14 22:35:47 -07:00
Additional Job Scheduling Fix Work
This commit is contained in:
@@ -51,7 +51,16 @@ def _build_runtime_config() -> Dict[str, Any]:
|
||||
if api_groups_override:
|
||||
api_groups: Any = api_groups_override
|
||||
else:
|
||||
api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs")
|
||||
api_groups = (
|
||||
"core",
|
||||
"auth",
|
||||
"tokens",
|
||||
"enrollment",
|
||||
"devices",
|
||||
"server",
|
||||
"assemblies",
|
||||
"scheduled_jobs",
|
||||
)
|
||||
|
||||
return {
|
||||
"HOST": os.environ.get("BOREALIS_ENGINE_HOST", DEFAULT_HOST),
|
||||
|
||||
@@ -283,7 +283,16 @@ def load_runtime_config(overrides: Optional[Mapping[str, Any]] = None) -> Engine
|
||||
runtime_config.get("API_GROUPS") or os.environ.get("BOREALIS_API_GROUPS")
|
||||
)
|
||||
if not api_groups:
|
||||
api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies")
|
||||
api_groups = (
|
||||
"core",
|
||||
"auth",
|
||||
"tokens",
|
||||
"enrollment",
|
||||
"devices",
|
||||
"server",
|
||||
"assemblies",
|
||||
"scheduled_jobs",
|
||||
)
|
||||
|
||||
settings = EngineSettings(
|
||||
database_path=database_path,
|
||||
|
||||
@@ -41,8 +41,9 @@ from .devices import routes as device_routes
|
||||
from .devices.approval import register_admin_endpoints
|
||||
from .devices.management import register_management
|
||||
from .scheduled_jobs import management as scheduled_jobs_management
|
||||
from .server import info as server_info
|
||||
|
||||
DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs")
|
||||
DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "server", "assemblies", "scheduled_jobs")
|
||||
|
||||
_SERVER_SCOPE_PATTERN = re.compile(r"\\b(?:scope|context|agent_context)=([A-Za-z0-9_-]+)", re.IGNORECASE)
|
||||
_SERVER_AGENT_ID_PATTERN = re.compile(r"\\bagent_id=([^\\s,]+)", re.IGNORECASE)
|
||||
@@ -274,11 +275,16 @@ def _register_assemblies(app: Flask, adapters: EngineServiceAdapters) -> None:
|
||||
register_execution(app, adapters)
|
||||
|
||||
|
||||
def _register_server(app: Flask, adapters: EngineServiceAdapters) -> None:
|
||||
server_info.register_info(app, adapters)
|
||||
|
||||
|
||||
_GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]] = {
|
||||
"auth": register_auth,
|
||||
"tokens": _register_tokens,
|
||||
"enrollment": _register_enrollment,
|
||||
"devices": _register_devices,
|
||||
"server": _register_server,
|
||||
"assemblies": _register_assemblies,
|
||||
"scheduled_jobs": _register_scheduled_jobs,
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
@@ -318,12 +319,13 @@ def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]:
|
||||
|
||||
|
||||
class JobScheduler:
|
||||
def __init__(self, app, socketio, db_path: str, script_signer=None):
|
||||
def __init__(self, app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None):
|
||||
self.app = app
|
||||
self.socketio = socketio
|
||||
self.db_path = db_path
|
||||
self._script_signer = script_signer
|
||||
self._running = False
|
||||
self._service_log = service_logger
|
||||
# Simulated run duration to hold jobs in "Running" before Success
|
||||
self.SIMULATED_RUN_SECONDS = int(os.environ.get("BOREALIS_SIM_RUN_SECONDS", "30"))
|
||||
# Retention for run history (days)
|
||||
@@ -341,6 +343,42 @@ class JobScheduler:
|
||||
# Bind routes
|
||||
self._register_routes()
|
||||
|
||||
def _log_event(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
level: str = "INFO",
|
||||
job_id: Optional[int] = None,
|
||||
host: Optional[str] = None,
|
||||
run_id: Optional[int] = None,
|
||||
extra: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
fragments: List[str] = []
|
||||
if job_id is not None:
|
||||
fragments.append(f"job={job_id}")
|
||||
if run_id is not None:
|
||||
fragments.append(f"run={run_id}")
|
||||
if host:
|
||||
fragments.append(f"host={host}")
|
||||
if extra:
|
||||
for key, value in extra.items():
|
||||
fragments.append(f"{key}={value}")
|
||||
payload = message
|
||||
if fragments:
|
||||
payload = f"{message} | " + " ".join(str(fragment) for fragment in fragments)
|
||||
scope = f"job-{job_id}" if job_id is not None else None
|
||||
try:
|
||||
if callable(self._service_log):
|
||||
self._service_log("scheduled_jobs", payload, scope=scope, level=level)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
numeric_level = getattr(logging, level.upper(), logging.INFO)
|
||||
self.app.logger.log(numeric_level, "[Scheduler] %s", payload)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ---------- Helpers for dispatching scripts ----------
|
||||
def _scripts_root(self) -> str:
|
||||
import os
|
||||
@@ -975,20 +1013,24 @@ class JobScheduler:
|
||||
Placeholder execution: mark Running then Success immediately.
|
||||
"""
|
||||
now = _now_ts()
|
||||
self._log_event("tick begin", extra={"now_ts": now})
|
||||
conn = self._conn()
|
||||
cur = conn.cursor()
|
||||
# First, mark any stale Running runs that exceeded job expiration as Timed Out
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT r.id, r.started_ts, j.expiration
|
||||
SELECT r.id, r.job_id, r.target_hostname, r.started_ts, j.expiration
|
||||
FROM scheduled_job_runs r
|
||||
JOIN scheduled_jobs j ON j.id = r.job_id
|
||||
WHERE r.status = 'Running'
|
||||
"""
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for rid, started_ts, expiration in rows:
|
||||
self._log_event(
|
||||
"evaluating running runs for expiration",
|
||||
extra={"running_count": len(rows), "now_ts": now},
|
||||
)
|
||||
for rid, row_job_id, row_host, started_ts, expiration in rows:
|
||||
if self._should_expire(started_ts, expiration, now):
|
||||
try:
|
||||
c2 = conn.cursor()
|
||||
@@ -997,18 +1039,40 @@ class JobScheduler:
|
||||
(now, rid),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
self._log_event(
|
||||
"marked run as timed out due to expiration window",
|
||||
level="WARNING",
|
||||
job_id=row_job_id,
|
||||
host=row_host,
|
||||
run_id=rid,
|
||||
extra={"started_ts": started_ts, "expiration": expiration},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to mark run as timed out",
|
||||
job_id=row_job_id,
|
||||
host=row_host,
|
||||
run_id=rid,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to evaluate running runs for expiration",
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
|
||||
# Next, finalize any Running runs that have passed the simulated duration
|
||||
try:
|
||||
cur.execute(
|
||||
"SELECT id, started_ts FROM scheduled_job_runs WHERE status='Running'"
|
||||
"SELECT id, job_id, target_hostname, started_ts FROM scheduled_job_runs WHERE status='Running'"
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for rid, started_ts in rows:
|
||||
self._log_event(
|
||||
"evaluating running runs for simulated completion",
|
||||
extra={"running_count": len(rows), "now_ts": now, "simulated_window": self.SIMULATED_RUN_SECONDS},
|
||||
)
|
||||
for rid, row_job_id, row_host, started_ts in rows:
|
||||
if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now:
|
||||
try:
|
||||
c2 = conn.cursor()
|
||||
@@ -1017,12 +1081,29 @@ class JobScheduler:
|
||||
(now, now, rid),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
self._log_event(
|
||||
"auto-completed simulated run",
|
||||
job_id=row_job_id,
|
||||
host=row_host,
|
||||
run_id=rid,
|
||||
extra={"started_ts": started_ts},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to auto-complete simulated run",
|
||||
job_id=row_job_id,
|
||||
host=row_host,
|
||||
run_id=rid,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to auto-complete simulated runs",
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
|
||||
# Finally, rotate history older than the retention window
|
||||
try:
|
||||
cutoff = now - (self.RETENTION_DAYS * 86400)
|
||||
cur.execute(
|
||||
@@ -1030,26 +1111,64 @@ class JobScheduler:
|
||||
(cutoff,)
|
||||
)
|
||||
conn.commit()
|
||||
except Exception:
|
||||
pass
|
||||
self._log_event(
|
||||
"purged scheduled_job_runs history older than retention window",
|
||||
extra={"cutoff_ts": cutoff, "retention_days": self.RETENTION_DAYS},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to purge scheduled_job_runs history",
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
|
||||
try:
|
||||
cur.execute(
|
||||
"SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, use_service_account, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC"
|
||||
)
|
||||
jobs = cur.fetchall()
|
||||
except Exception:
|
||||
self._log_event(
|
||||
"loaded enabled scheduled jobs for tick",
|
||||
extra={"job_count": len(jobs)},
|
||||
)
|
||||
except Exception as exc:
|
||||
jobs = []
|
||||
conn.close()
|
||||
self._log_event(
|
||||
"failed to load enabled scheduled jobs for tick",
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Online hostnames snapshot for this tick
|
||||
online = set()
|
||||
try:
|
||||
if callable(self._online_lookup):
|
||||
online = set(self._online_lookup() or [])
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to gather online host snapshot",
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
online = set()
|
||||
|
||||
five_min = 300
|
||||
snapshot_hosts: List[str] = []
|
||||
try:
|
||||
snapshot_hosts = sorted(list(online))[:20]
|
||||
except Exception:
|
||||
snapshot_hosts = list(online)
|
||||
self._log_event(
|
||||
"online host snapshot acquired",
|
||||
extra={
|
||||
"online_count": len(online),
|
||||
"hosts": ",".join(snapshot_hosts),
|
||||
},
|
||||
)
|
||||
|
||||
now_min = _now_minute()
|
||||
|
||||
for (
|
||||
@@ -1065,19 +1184,29 @@ class JobScheduler:
|
||||
created_at,
|
||||
) in jobs:
|
||||
try:
|
||||
# Targets list for this job
|
||||
try:
|
||||
targets = json.loads(targets_json or "[]")
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
targets = []
|
||||
self._log_event(
|
||||
"failed to parse targets JSON for job",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
targets = [str(t) for t in targets if isinstance(t, (str, int))]
|
||||
total_targets = len(targets)
|
||||
|
||||
# Determine scripts to run for this job (first-pass: all 'script' components)
|
||||
try:
|
||||
comps = json.loads(components_json or "[]")
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
comps = []
|
||||
self._log_event(
|
||||
"failed to parse components JSON for job",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
script_components = []
|
||||
ansible_components = []
|
||||
for c in comps:
|
||||
@@ -1095,7 +1224,13 @@ class JobScheduler:
|
||||
comp_copy = dict(c)
|
||||
comp_copy["path"] = p
|
||||
ansible_components.append(comp_copy)
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to normalise component entry",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc), "component": str(c)},
|
||||
)
|
||||
continue
|
||||
run_mode = (execution_context or "system").strip().lower()
|
||||
job_credential_id = None
|
||||
@@ -1104,25 +1239,67 @@ class JobScheduler:
|
||||
job_use_service_account = False
|
||||
try:
|
||||
job_credential_id = int(credential_id) if credential_id is not None else None
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
job_credential_id = None
|
||||
self._log_event(
|
||||
"failed to parse credential id for job",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
|
||||
self._log_event(
|
||||
"job snapshot for tick evaluation",
|
||||
job_id=job_id,
|
||||
extra={
|
||||
"schedule_type": schedule_type,
|
||||
"start_ts": start_ts,
|
||||
"expiration": expiration,
|
||||
"run_mode": run_mode,
|
||||
"targets": total_targets,
|
||||
"script_components": len(script_components),
|
||||
"ansible_components": len(ansible_components),
|
||||
"use_service_account": job_use_service_account,
|
||||
"credential_id": job_credential_id,
|
||||
"current_minute": now_min,
|
||||
},
|
||||
)
|
||||
|
||||
if total_targets == 0:
|
||||
self._log_event("job skipped due to zero targets", job_id=job_id)
|
||||
continue
|
||||
if not script_components and not ansible_components:
|
||||
self._log_event("job skipped due to no runnable components", job_id=job_id)
|
||||
continue
|
||||
|
||||
exp_seconds = _parse_expiration(expiration)
|
||||
|
||||
# Determine current occurrence to work on
|
||||
occ = None
|
||||
occ_from_runs = None
|
||||
conn2 = None
|
||||
try:
|
||||
conn2 = self._conn()
|
||||
c2 = conn2.cursor()
|
||||
c2.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,))
|
||||
occ_from_runs = c2.fetchone()[0]
|
||||
conn2.close()
|
||||
except Exception:
|
||||
occ_from_runs = None
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to fetch last occurrence for job",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
if conn2:
|
||||
conn2.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if occ_from_runs:
|
||||
occ = int(occ_from_runs)
|
||||
# Check if occurrence is complete (terminal status per target)
|
||||
done_count = 0
|
||||
conn2 = None
|
||||
try:
|
||||
conn2 = self._conn()
|
||||
c2 = conn2.cursor()
|
||||
@@ -1131,41 +1308,83 @@ class JobScheduler:
|
||||
(job_id, occ)
|
||||
)
|
||||
done_count = int(c2.fetchone()[0] or 0)
|
||||
conn2.close()
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to count completed targets for occurrence",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc), "occurrence_ts": occ},
|
||||
)
|
||||
done_count = 0
|
||||
finally:
|
||||
try:
|
||||
if conn2:
|
||||
conn2.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._log_event(
|
||||
"existing occurrence state evaluated",
|
||||
job_id=job_id,
|
||||
extra={
|
||||
"occurrence_ts": occ,
|
||||
"completed_targets": done_count,
|
||||
"total_targets": total_targets,
|
||||
},
|
||||
)
|
||||
|
||||
if total_targets > 0 and done_count >= total_targets:
|
||||
# Move to next occurrence if due in window
|
||||
nxt = self._compute_next_run(schedule_type, start_ts, occ, now_min)
|
||||
if nxt is not None and now_min >= nxt and (now_min - nxt) <= five_min:
|
||||
if nxt is not None and now_min >= nxt:
|
||||
self._log_event(
|
||||
"advancing to next occurrence",
|
||||
job_id=job_id,
|
||||
extra={"next_occurrence": nxt, "previous_occurrence": occ, "now_minute": now_min},
|
||||
)
|
||||
occ = nxt
|
||||
else:
|
||||
# Nothing to do this tick for this job
|
||||
self._log_event(
|
||||
"no action for job this tick; next occurrence not due",
|
||||
job_id=job_id,
|
||||
extra={
|
||||
"candidate_occurrence": nxt,
|
||||
"current_occurrence": occ,
|
||||
"now_minute": now_min,
|
||||
},
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# Continue working on this occurrence regardless of the 5-minute window until expiration
|
||||
pass
|
||||
self._log_event(
|
||||
"continuing current occurrence",
|
||||
job_id=job_id,
|
||||
extra={"occurrence_ts": occ, "completed_targets": done_count, "total_targets": total_targets},
|
||||
)
|
||||
else:
|
||||
# No occurrence yet; derive initial occurrence
|
||||
if (schedule_type or '').lower() == 'immediately':
|
||||
if (schedule_type or "").lower() == "immediately":
|
||||
occ = _floor_minute(created_at or now_min)
|
||||
else:
|
||||
st_min = _floor_minute(start_ts) if start_ts else None
|
||||
if st_min is None:
|
||||
# Try compute_next_run to derive first planned occurrence
|
||||
occ = self._compute_next_run(schedule_type, start_ts, None, now_min)
|
||||
else:
|
||||
occ = st_min
|
||||
if occ is None:
|
||||
self._log_event(
|
||||
"unable to determine initial occurrence for job; skipping",
|
||||
job_id=job_id,
|
||||
extra={"schedule_type": schedule_type, "start_ts": start_ts},
|
||||
)
|
||||
continue
|
||||
# For first occurrence, require it be within the 5-minute window to trigger
|
||||
if not (now_min >= occ and (now_min - occ) <= five_min):
|
||||
if now_min < occ:
|
||||
self._log_event(
|
||||
"job occurrence scheduled in the future; waiting",
|
||||
job_id=job_id,
|
||||
extra={"scheduled_minute": occ, "current_minute": now_min},
|
||||
)
|
||||
continue
|
||||
|
||||
# For each target, if no run exists for this occurrence, either start it (if online) or keep waiting/expire
|
||||
for host in targets:
|
||||
# Skip if a run already exists for this job/host/occurrence
|
||||
conn2 = None
|
||||
try:
|
||||
conn2 = self._conn()
|
||||
c2 = conn2.cursor()
|
||||
@@ -1174,16 +1393,17 @@ class JobScheduler:
|
||||
(job_id, host, occ)
|
||||
)
|
||||
row = c2.fetchone()
|
||||
except Exception:
|
||||
row = None
|
||||
if row:
|
||||
# Existing record - if Running, timeout handled earlier; skip
|
||||
conn2.close()
|
||||
continue
|
||||
if row:
|
||||
self._log_event(
|
||||
"run already exists for host and occurrence; skipping new dispatch",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=row[0],
|
||||
extra={"status": row[1], "scheduled_ts": occ},
|
||||
)
|
||||
continue
|
||||
|
||||
# Start if online; otherwise, wait until expiration
|
||||
if host in online:
|
||||
try:
|
||||
if host in online:
|
||||
ts_now = _now_ts()
|
||||
c2.execute(
|
||||
"INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, started_ts, status, created_at, updated_at) VALUES (?,?,?,?,?,?,?)",
|
||||
@@ -1191,7 +1411,18 @@ class JobScheduler:
|
||||
)
|
||||
run_row_id = c2.lastrowid or 0
|
||||
conn2.commit()
|
||||
activity_links: List[Dict[str, Any]] = []
|
||||
self._log_event(
|
||||
"created run record for host",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={
|
||||
"scheduled_ts": occ,
|
||||
"run_mode": run_mode,
|
||||
"component_count": len(script_components) + len(ansible_components),
|
||||
},
|
||||
)
|
||||
activity_links = []
|
||||
remote_requires_cred = (run_mode == "ssh") or (run_mode == "winrm" and not job_use_service_account)
|
||||
if remote_requires_cred and not job_credential_id:
|
||||
err_msg = "Credential required for remote execution"
|
||||
@@ -1200,10 +1431,27 @@ class JobScheduler:
|
||||
(ts_now, ts_now, err_msg, run_row_id),
|
||||
)
|
||||
conn2.commit()
|
||||
self._log_event(
|
||||
"run failed immediately due to missing credential",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
level="ERROR",
|
||||
extra={"run_mode": run_mode},
|
||||
)
|
||||
else:
|
||||
# Dispatch all script components for this job to the target host
|
||||
for comp in script_components:
|
||||
try:
|
||||
self._log_event(
|
||||
"dispatching script component to host",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={
|
||||
"component_path": comp.get("path"),
|
||||
"run_mode": run_mode,
|
||||
},
|
||||
)
|
||||
link = self._dispatch_script(host, comp, run_mode)
|
||||
if link and link.get("activity_id"):
|
||||
activity_links.append({
|
||||
@@ -1214,11 +1462,41 @@ class JobScheduler:
|
||||
"component_path": link.get("component_path") or "",
|
||||
"component_name": link.get("component_name") or "",
|
||||
})
|
||||
except Exception:
|
||||
self._log_event(
|
||||
"script component dispatched successfully",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={
|
||||
"component_path": link.get("component_path"),
|
||||
"activity_id": link.get("activity_id"),
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"script component dispatch failed",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
level="ERROR",
|
||||
extra={
|
||||
"component_path": comp.get("path"),
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
continue
|
||||
# Dispatch ansible playbooks for this job to the target host
|
||||
for comp in ansible_components:
|
||||
try:
|
||||
self._log_event(
|
||||
"dispatching ansible component to host",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={
|
||||
"component_path": comp.get("path"),
|
||||
"run_mode": run_mode,
|
||||
},
|
||||
)
|
||||
link = self._dispatch_ansible(
|
||||
host,
|
||||
comp,
|
||||
@@ -1237,15 +1515,43 @@ class JobScheduler:
|
||||
"component_path": link.get("component_path") or "",
|
||||
"component_name": link.get("component_name") or "",
|
||||
})
|
||||
self._log_event(
|
||||
"ansible component dispatched successfully",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={
|
||||
"component_path": link.get("component_path"),
|
||||
"activity_id": link.get("activity_id"),
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"ansible component dispatch failed",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
level="ERROR",
|
||||
extra={
|
||||
"component_path": comp.get("path"),
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
try:
|
||||
c2.execute(
|
||||
"UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?",
|
||||
(ts_now, ts_now, str(exc)[:512], run_row_id),
|
||||
)
|
||||
conn2.commit()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as inner_exc:
|
||||
self._log_event(
|
||||
"failed to record ansible dispatch failure on run",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(inner_exc)},
|
||||
)
|
||||
continue
|
||||
if activity_links:
|
||||
try:
|
||||
@@ -1263,48 +1569,92 @@ class JobScheduler:
|
||||
),
|
||||
)
|
||||
conn2.commit()
|
||||
except Exception:
|
||||
pass
|
||||
self._log_event(
|
||||
"linked run to activity records",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
extra={"activity_count": len(activity_links)},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to link run to activity records",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
run_id=run_row_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
else:
|
||||
if exp_seconds is not None and (occ + exp_seconds) <= now:
|
||||
ts_now = _now_ts()
|
||||
try:
|
||||
c2.execute(
|
||||
"INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)",
|
||||
(job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now),
|
||||
)
|
||||
conn2.commit()
|
||||
self._log_event(
|
||||
"marked run as expired due to offline device",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
level="WARNING",
|
||||
extra={"scheduled_ts": occ, "expiration_seconds": exp_seconds},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"failed to mark run as expired for offline device",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
else:
|
||||
self._log_event(
|
||||
"host offline; waiting before expiring run",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
extra={"scheduled_ts": occ, "expiration_seconds": exp_seconds},
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"unexpected failure while evaluating host for job",
|
||||
job_id=job_id,
|
||||
host=host,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc), "scheduled_ts": occ},
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
if conn2:
|
||||
conn2.close()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
conn2.close()
|
||||
else:
|
||||
# If expired window is configured and has passed, mark this host as Expired
|
||||
if exp_seconds is not None and (occ + exp_seconds) <= now:
|
||||
try:
|
||||
ts_now = _now_ts()
|
||||
c2.execute(
|
||||
"INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)",
|
||||
(job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now),
|
||||
)
|
||||
conn2.commit()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
conn2.close()
|
||||
else:
|
||||
# keep waiting; no record created yet
|
||||
try:
|
||||
conn2.close()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
# Keep loop resilient
|
||||
continue
|
||||
except Exception as exc:
|
||||
self._log_event(
|
||||
"unhandled exception while processing job during tick",
|
||||
job_id=job_id,
|
||||
level="ERROR",
|
||||
extra={"error": str(exc)},
|
||||
)
|
||||
pass
|
||||
|
||||
self._log_event("tick end", extra={"now_ts": now})
|
||||
|
||||
def start(self):
|
||||
if self._running:
|
||||
self._log_event("start requested but scheduler already running")
|
||||
return
|
||||
self._running = True
|
||||
self._log_event("scheduler loop starting")
|
||||
|
||||
def _loop():
|
||||
# cooperative loop aligned to minutes
|
||||
while self._running:
|
||||
try:
|
||||
self._tick_once()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
self._log_event("unhandled exception during scheduler tick", level="ERROR", extra={"error": repr(exc)})
|
||||
# Sleep until next minute boundary
|
||||
delay = 60 - (_now_ts() % 60)
|
||||
try:
|
||||
@@ -1316,10 +1666,12 @@ class JobScheduler:
|
||||
# Use SocketIO helper so it integrates with eventlet
|
||||
try:
|
||||
self.socketio.start_background_task(_loop)
|
||||
self._log_event("scheduler loop spawned via socketio task")
|
||||
except Exception:
|
||||
# Fallback to thread
|
||||
import threading
|
||||
threading.Thread(target=_loop, daemon=True).start()
|
||||
self._log_event("scheduler loop spawned via threading fallback")
|
||||
|
||||
# ---------- Route registration ----------
|
||||
def _register_routes(self):
|
||||
@@ -1832,9 +2184,9 @@ class JobScheduler:
|
||||
return {}
|
||||
|
||||
|
||||
def register(app, socketio, db_path: str, script_signer=None) -> JobScheduler:
|
||||
def register(app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None) -> JobScheduler:
|
||||
"""Factory to create and return a JobScheduler instance."""
|
||||
return JobScheduler(app, socketio, db_path, script_signer=script_signer)
|
||||
return JobScheduler(app, socketio, db_path, script_signer=script_signer, service_logger=service_logger)
|
||||
|
||||
|
||||
def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]):
|
||||
|
||||
@@ -17,7 +17,8 @@
|
||||
"""Scheduled job management integration for the Borealis Engine runtime."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
import time
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from . import job_scheduler
|
||||
|
||||
@@ -38,12 +39,56 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"):
|
||||
database_path = adapters.context.database_path
|
||||
script_signer = adapters.script_signer
|
||||
|
||||
def _online_hostnames_snapshot() -> List[str]:
|
||||
"""Return hostnames deemed online based on recent agent heartbeats."""
|
||||
threshold = int(time.time()) - 300
|
||||
conn = None
|
||||
try:
|
||||
conn = adapters.db_conn_factory()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT hostname FROM devices WHERE last_seen IS NOT NULL AND last_seen >= ?",
|
||||
(threshold,),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
except Exception as exc:
|
||||
adapters.service_log(
|
||||
"scheduled_jobs",
|
||||
f"online host snapshot lookup failed err={exc}",
|
||||
level="ERROR",
|
||||
)
|
||||
rows = []
|
||||
finally:
|
||||
try:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
seen = set()
|
||||
hostnames: List[str] = []
|
||||
for row in rows or []:
|
||||
try:
|
||||
raw = row[0] if isinstance(row, (list, tuple)) else row
|
||||
name = str(raw or "").strip()
|
||||
except Exception:
|
||||
name = ""
|
||||
if not name:
|
||||
continue
|
||||
for variant in (name, name.upper(), name.lower()):
|
||||
if variant and variant not in seen:
|
||||
seen.add(variant)
|
||||
hostnames.append(variant)
|
||||
return hostnames
|
||||
|
||||
scheduler = job_scheduler.register(
|
||||
app,
|
||||
socketio,
|
||||
database_path,
|
||||
script_signer=script_signer,
|
||||
service_logger=adapters.service_log,
|
||||
)
|
||||
job_scheduler.set_online_lookup(scheduler, _online_hostnames_snapshot)
|
||||
scheduler.start()
|
||||
adapters.context.scheduler = scheduler
|
||||
adapters.service_log("scheduled_jobs", "engine scheduler initialised", level="INFO")
|
||||
@@ -61,4 +106,3 @@ def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None
|
||||
"""Ensure scheduled job routes are registered via the Engine scheduler."""
|
||||
|
||||
ensure_scheduler(app, adapters)
|
||||
|
||||
|
||||
@@ -1,8 +1,46 @@
|
||||
# ======================================================
|
||||
# Data\Engine\services\API\server\info.py
|
||||
# Description: Placeholder for server information endpoints.
|
||||
# Description: Server information endpoints surfaced for administrative UX.
|
||||
#
|
||||
# API Endpoints (if applicable): None
|
||||
# API Endpoints (if applicable):
|
||||
# - GET /api/server/time (Operator Session) - Returns the server clock in multiple formats.
|
||||
# ======================================================
|
||||
|
||||
"Placeholder for API module server/info.py."
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from flask import Blueprint, Flask, jsonify
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover - typing aide
|
||||
from .. import EngineServiceAdapters
|
||||
|
||||
|
||||
def _serialize_time(now_local: datetime, now_utc: datetime) -> Dict[str, Any]:
|
||||
tz_label = now_local.tzname()
|
||||
display = now_local.strftime("%Y-%m-%d %H:%M:%S %Z").strip()
|
||||
if not display:
|
||||
display = now_local.isoformat()
|
||||
return {
|
||||
"epoch": int(now_local.timestamp()),
|
||||
"iso": now_local.isoformat(),
|
||||
"utc": now_utc.isoformat(),
|
||||
"timezone": tz_label,
|
||||
"display": display,
|
||||
}
|
||||
|
||||
|
||||
def register_info(app: Flask, _adapters: "EngineServiceAdapters") -> None:
|
||||
"""Expose server telemetry endpoints used by the admin interface."""
|
||||
|
||||
blueprint = Blueprint("engine_server_info", __name__)
|
||||
|
||||
@blueprint.route("/api/server/time", methods=["GET"])
|
||||
def server_time() -> Any:
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
now_local = now_utc.astimezone()
|
||||
payload = _serialize_time(now_local, now_utc)
|
||||
return jsonify(payload)
|
||||
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
Reference in New Issue
Block a user