diff --git a/Data/Engine/bootstrapper.py b/Data/Engine/bootstrapper.py index 8520e3a1..5d4bc2a1 100644 --- a/Data/Engine/bootstrapper.py +++ b/Data/Engine/bootstrapper.py @@ -51,7 +51,16 @@ def _build_runtime_config() -> Dict[str, Any]: if api_groups_override: api_groups: Any = api_groups_override else: - api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs") + api_groups = ( + "core", + "auth", + "tokens", + "enrollment", + "devices", + "server", + "assemblies", + "scheduled_jobs", + ) return { "HOST": os.environ.get("BOREALIS_ENGINE_HOST", DEFAULT_HOST), diff --git a/Data/Engine/config.py b/Data/Engine/config.py index 6d95607d..f1838bd6 100644 --- a/Data/Engine/config.py +++ b/Data/Engine/config.py @@ -283,7 +283,16 @@ def load_runtime_config(overrides: Optional[Mapping[str, Any]] = None) -> Engine runtime_config.get("API_GROUPS") or os.environ.get("BOREALIS_API_GROUPS") ) if not api_groups: - api_groups = ("core", "auth", "tokens", "enrollment", "devices", "assemblies") + api_groups = ( + "core", + "auth", + "tokens", + "enrollment", + "devices", + "server", + "assemblies", + "scheduled_jobs", + ) settings = EngineSettings( database_path=database_path, diff --git a/Data/Engine/services/API/__init__.py b/Data/Engine/services/API/__init__.py index b1671af6..365457ef 100644 --- a/Data/Engine/services/API/__init__.py +++ b/Data/Engine/services/API/__init__.py @@ -41,8 +41,9 @@ from .devices import routes as device_routes from .devices.approval import register_admin_endpoints from .devices.management import register_management from .scheduled_jobs import management as scheduled_jobs_management +from .server import info as server_info -DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "assemblies", "scheduled_jobs") +DEFAULT_API_GROUPS: Sequence[str] = ("core", "auth", "tokens", "enrollment", "devices", "server", "assemblies", "scheduled_jobs") _SERVER_SCOPE_PATTERN = re.compile(r"\\b(?:scope|context|agent_context)=([A-Za-z0-9_-]+)", re.IGNORECASE) _SERVER_AGENT_ID_PATTERN = re.compile(r"\\bagent_id=([^\\s,]+)", re.IGNORECASE) @@ -274,11 +275,16 @@ def _register_assemblies(app: Flask, adapters: EngineServiceAdapters) -> None: register_execution(app, adapters) +def _register_server(app: Flask, adapters: EngineServiceAdapters) -> None: + server_info.register_info(app, adapters) + + _GROUP_REGISTRARS: Mapping[str, Callable[[Flask, EngineServiceAdapters], None]] = { "auth": register_auth, "tokens": _register_tokens, "enrollment": _register_enrollment, "devices": _register_devices, + "server": _register_server, "assemblies": _register_assemblies, "scheduled_jobs": _register_scheduled_jobs, } diff --git a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py index f9ef0296..9b320f2e 100644 --- a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py +++ b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py @@ -11,6 +11,7 @@ from __future__ import annotations import base64 import json +import logging import os import re import sqlite3 @@ -318,12 +319,13 @@ def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]: class JobScheduler: - def __init__(self, app, socketio, db_path: str, script_signer=None): + def __init__(self, app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None): self.app = app self.socketio = socketio self.db_path = db_path self._script_signer = script_signer self._running = False + self._service_log = service_logger # Simulated run duration to hold jobs in "Running" before Success self.SIMULATED_RUN_SECONDS = int(os.environ.get("BOREALIS_SIM_RUN_SECONDS", "30")) # Retention for run history (days) @@ -341,6 +343,42 @@ class JobScheduler: # Bind routes self._register_routes() + def _log_event( + self, + message: str, + *, + level: str = "INFO", + job_id: Optional[int] = None, + host: Optional[str] = None, + run_id: Optional[int] = None, + extra: Optional[Dict[str, Any]] = None, + ) -> None: + fragments: List[str] = [] + if job_id is not None: + fragments.append(f"job={job_id}") + if run_id is not None: + fragments.append(f"run={run_id}") + if host: + fragments.append(f"host={host}") + if extra: + for key, value in extra.items(): + fragments.append(f"{key}={value}") + payload = message + if fragments: + payload = f"{message} | " + " ".join(str(fragment) for fragment in fragments) + scope = f"job-{job_id}" if job_id is not None else None + try: + if callable(self._service_log): + self._service_log("scheduled_jobs", payload, scope=scope, level=level) + return + except Exception: + pass + try: + numeric_level = getattr(logging, level.upper(), logging.INFO) + self.app.logger.log(numeric_level, "[Scheduler] %s", payload) + except Exception: + pass + # ---------- Helpers for dispatching scripts ---------- def _scripts_root(self) -> str: import os @@ -975,20 +1013,24 @@ class JobScheduler: Placeholder execution: mark Running then Success immediately. """ now = _now_ts() + self._log_event("tick begin", extra={"now_ts": now}) conn = self._conn() cur = conn.cursor() - # First, mark any stale Running runs that exceeded job expiration as Timed Out try: cur.execute( """ - SELECT r.id, r.started_ts, j.expiration + SELECT r.id, r.job_id, r.target_hostname, r.started_ts, j.expiration FROM scheduled_job_runs r JOIN scheduled_jobs j ON j.id = r.job_id WHERE r.status = 'Running' """ ) rows = cur.fetchall() - for rid, started_ts, expiration in rows: + self._log_event( + "evaluating running runs for expiration", + extra={"running_count": len(rows), "now_ts": now}, + ) + for rid, row_job_id, row_host, started_ts, expiration in rows: if self._should_expire(started_ts, expiration, now): try: c2 = conn.cursor() @@ -997,18 +1039,40 @@ class JobScheduler: (now, rid), ) conn.commit() - except Exception: - pass - except Exception: - pass + self._log_event( + "marked run as timed out due to expiration window", + level="WARNING", + job_id=row_job_id, + host=row_host, + run_id=rid, + extra={"started_ts": started_ts, "expiration": expiration}, + ) + except Exception as exc: + self._log_event( + "failed to mark run as timed out", + job_id=row_job_id, + host=row_host, + run_id=rid, + level="ERROR", + extra={"error": str(exc)}, + ) + except Exception as exc: + self._log_event( + "failed to evaluate running runs for expiration", + level="ERROR", + extra={"error": str(exc)}, + ) - # Next, finalize any Running runs that have passed the simulated duration try: cur.execute( - "SELECT id, started_ts FROM scheduled_job_runs WHERE status='Running'" + "SELECT id, job_id, target_hostname, started_ts FROM scheduled_job_runs WHERE status='Running'" ) rows = cur.fetchall() - for rid, started_ts in rows: + self._log_event( + "evaluating running runs for simulated completion", + extra={"running_count": len(rows), "now_ts": now, "simulated_window": self.SIMULATED_RUN_SECONDS}, + ) + for rid, row_job_id, row_host, started_ts in rows: if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now: try: c2 = conn.cursor() @@ -1017,12 +1081,29 @@ class JobScheduler: (now, now, rid), ) conn.commit() - except Exception: - pass - except Exception: - pass + self._log_event( + "auto-completed simulated run", + job_id=row_job_id, + host=row_host, + run_id=rid, + extra={"started_ts": started_ts}, + ) + except Exception as exc: + self._log_event( + "failed to auto-complete simulated run", + job_id=row_job_id, + host=row_host, + run_id=rid, + level="ERROR", + extra={"error": str(exc)}, + ) + except Exception as exc: + self._log_event( + "failed to auto-complete simulated runs", + level="ERROR", + extra={"error": str(exc)}, + ) - # Finally, rotate history older than the retention window try: cutoff = now - (self.RETENTION_DAYS * 86400) cur.execute( @@ -1030,26 +1111,64 @@ class JobScheduler: (cutoff,) ) conn.commit() - except Exception: - pass + self._log_event( + "purged scheduled_job_runs history older than retention window", + extra={"cutoff_ts": cutoff, "retention_days": self.RETENTION_DAYS}, + ) + except Exception as exc: + self._log_event( + "failed to purge scheduled_job_runs history", + level="ERROR", + extra={"error": str(exc)}, + ) + try: cur.execute( "SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, use_service_account, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC" ) jobs = cur.fetchall() - except Exception: + self._log_event( + "loaded enabled scheduled jobs for tick", + extra={"job_count": len(jobs)}, + ) + except Exception as exc: jobs = [] - conn.close() + self._log_event( + "failed to load enabled scheduled jobs for tick", + level="ERROR", + extra={"error": str(exc)}, + ) + finally: + try: + conn.close() + except Exception: + pass - # Online hostnames snapshot for this tick online = set() try: if callable(self._online_lookup): online = set(self._online_lookup() or []) - except Exception: + except Exception as exc: + self._log_event( + "failed to gather online host snapshot", + level="ERROR", + extra={"error": str(exc)}, + ) online = set() - five_min = 300 + snapshot_hosts: List[str] = [] + try: + snapshot_hosts = sorted(list(online))[:20] + except Exception: + snapshot_hosts = list(online) + self._log_event( + "online host snapshot acquired", + extra={ + "online_count": len(online), + "hosts": ",".join(snapshot_hosts), + }, + ) + now_min = _now_minute() for ( @@ -1065,19 +1184,29 @@ class JobScheduler: created_at, ) in jobs: try: - # Targets list for this job try: targets = json.loads(targets_json or "[]") - except Exception: + except Exception as exc: targets = [] + self._log_event( + "failed to parse targets JSON for job", + job_id=job_id, + level="ERROR", + extra={"error": str(exc)}, + ) targets = [str(t) for t in targets if isinstance(t, (str, int))] total_targets = len(targets) - # Determine scripts to run for this job (first-pass: all 'script' components) try: comps = json.loads(components_json or "[]") - except Exception: + except Exception as exc: comps = [] + self._log_event( + "failed to parse components JSON for job", + job_id=job_id, + level="ERROR", + extra={"error": str(exc)}, + ) script_components = [] ansible_components = [] for c in comps: @@ -1095,7 +1224,13 @@ class JobScheduler: comp_copy = dict(c) comp_copy["path"] = p ansible_components.append(comp_copy) - except Exception: + except Exception as exc: + self._log_event( + "failed to normalise component entry", + job_id=job_id, + level="ERROR", + extra={"error": str(exc), "component": str(c)}, + ) continue run_mode = (execution_context or "system").strip().lower() job_credential_id = None @@ -1104,25 +1239,67 @@ class JobScheduler: job_use_service_account = False try: job_credential_id = int(credential_id) if credential_id is not None else None - except Exception: + except Exception as exc: job_credential_id = None + self._log_event( + "failed to parse credential id for job", + job_id=job_id, + level="ERROR", + extra={"error": str(exc)}, + ) + + self._log_event( + "job snapshot for tick evaluation", + job_id=job_id, + extra={ + "schedule_type": schedule_type, + "start_ts": start_ts, + "expiration": expiration, + "run_mode": run_mode, + "targets": total_targets, + "script_components": len(script_components), + "ansible_components": len(ansible_components), + "use_service_account": job_use_service_account, + "credential_id": job_credential_id, + "current_minute": now_min, + }, + ) + + if total_targets == 0: + self._log_event("job skipped due to zero targets", job_id=job_id) + continue + if not script_components and not ansible_components: + self._log_event("job skipped due to no runnable components", job_id=job_id) + continue exp_seconds = _parse_expiration(expiration) - # Determine current occurrence to work on occ = None + occ_from_runs = None + conn2 = None try: conn2 = self._conn() c2 = conn2.cursor() c2.execute("SELECT MAX(scheduled_ts) FROM scheduled_job_runs WHERE job_id=?", (job_id,)) occ_from_runs = c2.fetchone()[0] - conn2.close() - except Exception: - occ_from_runs = None + except Exception as exc: + self._log_event( + "failed to fetch last occurrence for job", + job_id=job_id, + level="ERROR", + extra={"error": str(exc)}, + ) + finally: + try: + if conn2: + conn2.close() + except Exception: + pass if occ_from_runs: occ = int(occ_from_runs) - # Check if occurrence is complete (terminal status per target) + done_count = 0 + conn2 = None try: conn2 = self._conn() c2 = conn2.cursor() @@ -1131,41 +1308,83 @@ class JobScheduler: (job_id, occ) ) done_count = int(c2.fetchone()[0] or 0) - conn2.close() - except Exception: + except Exception as exc: + self._log_event( + "failed to count completed targets for occurrence", + job_id=job_id, + level="ERROR", + extra={"error": str(exc), "occurrence_ts": occ}, + ) done_count = 0 + finally: + try: + if conn2: + conn2.close() + except Exception: + pass + + self._log_event( + "existing occurrence state evaluated", + job_id=job_id, + extra={ + "occurrence_ts": occ, + "completed_targets": done_count, + "total_targets": total_targets, + }, + ) if total_targets > 0 and done_count >= total_targets: - # Move to next occurrence if due in window nxt = self._compute_next_run(schedule_type, start_ts, occ, now_min) - if nxt is not None and now_min >= nxt and (now_min - nxt) <= five_min: + if nxt is not None and now_min >= nxt: + self._log_event( + "advancing to next occurrence", + job_id=job_id, + extra={"next_occurrence": nxt, "previous_occurrence": occ, "now_minute": now_min}, + ) occ = nxt else: - # Nothing to do this tick for this job + self._log_event( + "no action for job this tick; next occurrence not due", + job_id=job_id, + extra={ + "candidate_occurrence": nxt, + "current_occurrence": occ, + "now_minute": now_min, + }, + ) continue else: - # Continue working on this occurrence regardless of the 5-minute window until expiration - pass + self._log_event( + "continuing current occurrence", + job_id=job_id, + extra={"occurrence_ts": occ, "completed_targets": done_count, "total_targets": total_targets}, + ) else: - # No occurrence yet; derive initial occurrence - if (schedule_type or '').lower() == 'immediately': + if (schedule_type or "").lower() == "immediately": occ = _floor_minute(created_at or now_min) else: st_min = _floor_minute(start_ts) if start_ts else None if st_min is None: - # Try compute_next_run to derive first planned occurrence occ = self._compute_next_run(schedule_type, start_ts, None, now_min) else: occ = st_min if occ is None: + self._log_event( + "unable to determine initial occurrence for job; skipping", + job_id=job_id, + extra={"schedule_type": schedule_type, "start_ts": start_ts}, + ) continue - # For first occurrence, require it be within the 5-minute window to trigger - if not (now_min >= occ and (now_min - occ) <= five_min): + if now_min < occ: + self._log_event( + "job occurrence scheduled in the future; waiting", + job_id=job_id, + extra={"scheduled_minute": occ, "current_minute": now_min}, + ) continue - # For each target, if no run exists for this occurrence, either start it (if online) or keep waiting/expire for host in targets: - # Skip if a run already exists for this job/host/occurrence + conn2 = None try: conn2 = self._conn() c2 = conn2.cursor() @@ -1174,16 +1393,17 @@ class JobScheduler: (job_id, host, occ) ) row = c2.fetchone() - except Exception: - row = None - if row: - # Existing record - if Running, timeout handled earlier; skip - conn2.close() - continue + if row: + self._log_event( + "run already exists for host and occurrence; skipping new dispatch", + job_id=job_id, + host=host, + run_id=row[0], + extra={"status": row[1], "scheduled_ts": occ}, + ) + continue - # Start if online; otherwise, wait until expiration - if host in online: - try: + if host in online: ts_now = _now_ts() c2.execute( "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, started_ts, status, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", @@ -1191,7 +1411,18 @@ class JobScheduler: ) run_row_id = c2.lastrowid or 0 conn2.commit() - activity_links: List[Dict[str, Any]] = [] + self._log_event( + "created run record for host", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={ + "scheduled_ts": occ, + "run_mode": run_mode, + "component_count": len(script_components) + len(ansible_components), + }, + ) + activity_links = [] remote_requires_cred = (run_mode == "ssh") or (run_mode == "winrm" and not job_use_service_account) if remote_requires_cred and not job_credential_id: err_msg = "Credential required for remote execution" @@ -1200,10 +1431,27 @@ class JobScheduler: (ts_now, ts_now, err_msg, run_row_id), ) conn2.commit() + self._log_event( + "run failed immediately due to missing credential", + job_id=job_id, + host=host, + run_id=run_row_id, + level="ERROR", + extra={"run_mode": run_mode}, + ) else: - # Dispatch all script components for this job to the target host for comp in script_components: try: + self._log_event( + "dispatching script component to host", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={ + "component_path": comp.get("path"), + "run_mode": run_mode, + }, + ) link = self._dispatch_script(host, comp, run_mode) if link and link.get("activity_id"): activity_links.append({ @@ -1214,11 +1462,41 @@ class JobScheduler: "component_path": link.get("component_path") or "", "component_name": link.get("component_name") or "", }) - except Exception: + self._log_event( + "script component dispatched successfully", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={ + "component_path": link.get("component_path"), + "activity_id": link.get("activity_id"), + }, + ) + except Exception as exc: + self._log_event( + "script component dispatch failed", + job_id=job_id, + host=host, + run_id=run_row_id, + level="ERROR", + extra={ + "component_path": comp.get("path"), + "error": str(exc), + }, + ) continue - # Dispatch ansible playbooks for this job to the target host for comp in ansible_components: try: + self._log_event( + "dispatching ansible component to host", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={ + "component_path": comp.get("path"), + "run_mode": run_mode, + }, + ) link = self._dispatch_ansible( host, comp, @@ -1237,15 +1515,43 @@ class JobScheduler: "component_path": link.get("component_path") or "", "component_name": link.get("component_name") or "", }) + self._log_event( + "ansible component dispatched successfully", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={ + "component_path": link.get("component_path"), + "activity_id": link.get("activity_id"), + }, + ) except Exception as exc: + self._log_event( + "ansible component dispatch failed", + job_id=job_id, + host=host, + run_id=run_row_id, + level="ERROR", + extra={ + "component_path": comp.get("path"), + "error": str(exc), + }, + ) try: c2.execute( "UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?", (ts_now, ts_now, str(exc)[:512], run_row_id), ) conn2.commit() - except Exception: - pass + except Exception as inner_exc: + self._log_event( + "failed to record ansible dispatch failure on run", + job_id=job_id, + host=host, + run_id=run_row_id, + level="ERROR", + extra={"error": str(inner_exc)}, + ) continue if activity_links: try: @@ -1263,48 +1569,92 @@ class JobScheduler: ), ) conn2.commit() - except Exception: - pass + self._log_event( + "linked run to activity records", + job_id=job_id, + host=host, + run_id=run_row_id, + extra={"activity_count": len(activity_links)}, + ) + except Exception as exc: + self._log_event( + "failed to link run to activity records", + job_id=job_id, + host=host, + run_id=run_row_id, + level="ERROR", + extra={"error": str(exc)}, + ) + else: + if exp_seconds is not None and (occ + exp_seconds) <= now: + ts_now = _now_ts() + try: + c2.execute( + "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)", + (job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now), + ) + conn2.commit() + self._log_event( + "marked run as expired due to offline device", + job_id=job_id, + host=host, + level="WARNING", + extra={"scheduled_ts": occ, "expiration_seconds": exp_seconds}, + ) + except Exception as exc: + self._log_event( + "failed to mark run as expired for offline device", + job_id=job_id, + host=host, + level="ERROR", + extra={"error": str(exc)}, + ) + else: + self._log_event( + "host offline; waiting before expiring run", + job_id=job_id, + host=host, + extra={"scheduled_ts": occ, "expiration_seconds": exp_seconds}, + ) + except Exception as exc: + self._log_event( + "unexpected failure while evaluating host for job", + job_id=job_id, + host=host, + level="ERROR", + extra={"error": str(exc), "scheduled_ts": occ}, + ) + finally: + try: + if conn2: + conn2.close() except Exception: pass - finally: - conn2.close() - else: - # If expired window is configured and has passed, mark this host as Expired - if exp_seconds is not None and (occ + exp_seconds) <= now: - try: - ts_now = _now_ts() - c2.execute( - "INSERT INTO scheduled_job_runs (job_id, target_hostname, scheduled_ts, finished_ts, status, error, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)", - (job_id, host, occ, ts_now, "Expired", "Device offline", ts_now, ts_now), - ) - conn2.commit() - except Exception: - pass - finally: - conn2.close() - else: - # keep waiting; no record created yet - try: - conn2.close() - except Exception: - pass - except Exception: - # Keep loop resilient - continue + except Exception as exc: + self._log_event( + "unhandled exception while processing job during tick", + job_id=job_id, + level="ERROR", + extra={"error": str(exc)}, + ) + pass + + self._log_event("tick end", extra={"now_ts": now}) def start(self): if self._running: + self._log_event("start requested but scheduler already running") return self._running = True + self._log_event("scheduler loop starting") def _loop(): # cooperative loop aligned to minutes while self._running: try: self._tick_once() - except Exception: - pass + except Exception as exc: + self._log_event("unhandled exception during scheduler tick", level="ERROR", extra={"error": repr(exc)}) # Sleep until next minute boundary delay = 60 - (_now_ts() % 60) try: @@ -1316,10 +1666,12 @@ class JobScheduler: # Use SocketIO helper so it integrates with eventlet try: self.socketio.start_background_task(_loop) + self._log_event("scheduler loop spawned via socketio task") except Exception: # Fallback to thread import threading threading.Thread(target=_loop, daemon=True).start() + self._log_event("scheduler loop spawned via threading fallback") # ---------- Route registration ---------- def _register_routes(self): @@ -1832,9 +2184,9 @@ class JobScheduler: return {} -def register(app, socketio, db_path: str, script_signer=None) -> JobScheduler: +def register(app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None) -> JobScheduler: """Factory to create and return a JobScheduler instance.""" - return JobScheduler(app, socketio, db_path, script_signer=script_signer) + return JobScheduler(app, socketio, db_path, script_signer=script_signer, service_logger=service_logger) def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]): diff --git a/Data/Engine/services/API/scheduled_jobs/management.py b/Data/Engine/services/API/scheduled_jobs/management.py index 75f4e0ab..f1f8e58b 100644 --- a/Data/Engine/services/API/scheduled_jobs/management.py +++ b/Data/Engine/services/API/scheduled_jobs/management.py @@ -17,7 +17,8 @@ """Scheduled job management integration for the Borealis Engine runtime.""" from __future__ import annotations -from typing import TYPE_CHECKING +import time +from typing import TYPE_CHECKING, List from . import job_scheduler @@ -38,12 +39,56 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"): database_path = adapters.context.database_path script_signer = adapters.script_signer + def _online_hostnames_snapshot() -> List[str]: + """Return hostnames deemed online based on recent agent heartbeats.""" + threshold = int(time.time()) - 300 + conn = None + try: + conn = adapters.db_conn_factory() + cur = conn.cursor() + cur.execute( + "SELECT hostname FROM devices WHERE last_seen IS NOT NULL AND last_seen >= ?", + (threshold,), + ) + rows = cur.fetchall() + except Exception as exc: + adapters.service_log( + "scheduled_jobs", + f"online host snapshot lookup failed err={exc}", + level="ERROR", + ) + rows = [] + finally: + try: + if conn is not None: + conn.close() + except Exception: + pass + + seen = set() + hostnames: List[str] = [] + for row in rows or []: + try: + raw = row[0] if isinstance(row, (list, tuple)) else row + name = str(raw or "").strip() + except Exception: + name = "" + if not name: + continue + for variant in (name, name.upper(), name.lower()): + if variant and variant not in seen: + seen.add(variant) + hostnames.append(variant) + return hostnames + scheduler = job_scheduler.register( app, socketio, database_path, script_signer=script_signer, + service_logger=adapters.service_log, ) + job_scheduler.set_online_lookup(scheduler, _online_hostnames_snapshot) scheduler.start() adapters.context.scheduler = scheduler adapters.service_log("scheduled_jobs", "engine scheduler initialised", level="INFO") @@ -61,4 +106,3 @@ def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None """Ensure scheduled job routes are registered via the Engine scheduler.""" ensure_scheduler(app, adapters) - diff --git a/Data/Engine/services/API/server/info.py b/Data/Engine/services/API/server/info.py index 12b37628..187b1a7d 100644 --- a/Data/Engine/services/API/server/info.py +++ b/Data/Engine/services/API/server/info.py @@ -1,8 +1,46 @@ # ====================================================== # Data\Engine\services\API\server\info.py -# Description: Placeholder for server information endpoints. +# Description: Server information endpoints surfaced for administrative UX. # -# API Endpoints (if applicable): None +# API Endpoints (if applicable): +# - GET /api/server/time (Operator Session) - Returns the server clock in multiple formats. # ====================================================== -"Placeholder for API module server/info.py." +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Dict + +from flask import Blueprint, Flask, jsonify + +if TYPE_CHECKING: # pragma: no cover - typing aide + from .. import EngineServiceAdapters + + +def _serialize_time(now_local: datetime, now_utc: datetime) -> Dict[str, Any]: + tz_label = now_local.tzname() + display = now_local.strftime("%Y-%m-%d %H:%M:%S %Z").strip() + if not display: + display = now_local.isoformat() + return { + "epoch": int(now_local.timestamp()), + "iso": now_local.isoformat(), + "utc": now_utc.isoformat(), + "timezone": tz_label, + "display": display, + } + + +def register_info(app: Flask, _adapters: "EngineServiceAdapters") -> None: + """Expose server telemetry endpoints used by the admin interface.""" + + blueprint = Blueprint("engine_server_info", __name__) + + @blueprint.route("/api/server/time", methods=["GET"]) + def server_time() -> Any: + now_utc = datetime.now(timezone.utc) + now_local = now_utc.astimezone() + payload = _serialize_time(now_local, now_utc) + return jsonify(payload) + + app.register_blueprint(blueprint)