Merge pull request #149 from bunny-lab-io/codex/port-legacy_job_scheduler-to-new-engine

Port scheduler and migrations into Engine
This commit is contained in:
2025-11-02 00:09:09 -06:00
committed by GitHub
9 changed files with 2472 additions and 54 deletions

View File

@@ -16,6 +16,33 @@ from typing import Iterator
import pytest
from flask import Flask
import sys
import types
import importlib.machinery
def _ensure_eventlet_stub() -> None:
if "eventlet" in sys.modules:
return
eventlet_module = types.ModuleType("eventlet")
eventlet_module.monkey_patch = lambda **_kwargs: None # type: ignore[attr-defined]
eventlet_module.sleep = lambda _seconds: None # type: ignore[attr-defined]
wsgi_module = types.ModuleType("eventlet.wsgi")
wsgi_module.HttpProtocol = object() # type: ignore[attr-defined]
eventlet_module.wsgi = wsgi_module # type: ignore[attr-defined]
eventlet_module.__spec__ = importlib.machinery.ModuleSpec("eventlet", loader=None)
wsgi_module.__spec__ = importlib.machinery.ModuleSpec("eventlet.wsgi", loader=None)
sys.modules["eventlet"] = eventlet_module
sys.modules["eventlet.wsgi"] = wsgi_module
_ensure_eventlet_stub()
from Data.Engine.server import create_app

View File

@@ -37,6 +37,18 @@ def test_list_devices(engine_harness: EngineTestHarness) -> None:
assert "summary" in device and isinstance(device["summary"], dict)
def test_list_agents(engine_harness: EngineTestHarness) -> None:
client = engine_harness.app.test_client()
response = client.get("/api/agents")
assert response.status_code == 200
payload = response.get_json()
assert isinstance(payload, dict)
assert payload, "expected at least one agent in the response"
first_agent = next(iter(payload.values()))
assert first_agent["hostname"] == "test-device"
assert first_agent["agent_id"] == "test-device-agent"
def test_device_details(engine_harness: EngineTestHarness) -> None:
client = engine_harness.app.test_client()
response = client.get("/api/device/details/test-device")

View File

@@ -0,0 +1,83 @@
# ======================================================
# Data\Engine\Unit_Tests\test_scheduler_timing.py
# Description: Validates the Engine job scheduler's interval calculations to
# ensure jobs are queued on the expected cadence.
#
# API Endpoints (if applicable): None
# ======================================================
from __future__ import annotations
import datetime as dt
from pathlib import Path
from typing import Callable, List, Tuple
from flask import Flask
from Data.Engine.services.API.scheduled_jobs import job_scheduler
class _DummySocketIO:
def __init__(self) -> None:
self.started_tasks: List[Tuple[Callable, tuple, dict]] = []
def start_background_task(self, target: Callable, *args, **kwargs):
# The scheduler calls into Socket.IO to spawn the background loop.
# Tests only verify the scheduling math, so we capture the request
# without launching a greenlet/thread.
self.started_tasks.append((target, args, kwargs))
return None
def emit(self, *_args, **_kwargs):
return None
def _make_scheduler(tmp_path: Path) -> job_scheduler.JobScheduler:
app = Flask(__name__)
db_path = tmp_path / "scheduler.sqlite3"
return job_scheduler.JobScheduler(app, _DummySocketIO(), str(db_path))
def _ts(year: int, month: int, day: int, hour: int = 0, minute: int = 0) -> int:
return int(dt.datetime(year, month, day, hour, minute, tzinfo=dt.timezone.utc).timestamp())
def test_immediate_schedule_runs_once(tmp_path):
scheduler = _make_scheduler(tmp_path)
now = _ts(2024, 3, 1, 12, 0)
assert scheduler._compute_next_run("immediately", None, None, now) == now
assert scheduler._compute_next_run("immediately", None, now, now) is None
def test_hourly_schedule_advances_increments(tmp_path):
scheduler = _make_scheduler(tmp_path)
start = _ts(2024, 3, 1, 9, 0)
now = _ts(2024, 3, 1, 9, 30)
assert scheduler._compute_next_run("every_hour", start, None, now) == start
next_candidate = scheduler._compute_next_run("every_hour", start, start, now)
assert next_candidate == _ts(2024, 3, 1, 10, 0)
def test_daily_schedule_rolls_forward(tmp_path):
scheduler = _make_scheduler(tmp_path)
start = _ts(2024, 3, 1, 6, 15)
after_two_days = _ts(2024, 3, 3, 5, 0)
next_run = scheduler._compute_next_run("daily", start, start, after_two_days)
assert next_run == _ts(2024, 3, 2, 6, 15)
def test_monthly_schedule_handles_late_month(tmp_path):
scheduler = _make_scheduler(tmp_path)
start = _ts(2024, 1, 31, 8, 0)
after_two_months = _ts(2024, 3, 5, 8, 0)
next_run = scheduler._compute_next_run("monthly", start, start, after_two_months)
# February 2024 has 29 days, so the scheduler should clamp to Feb 29th.
assert next_run == _ts(2024, 2, 29, 8, 0)
def test_yearly_schedule_rolls_forward(tmp_path):
scheduler = _make_scheduler(tmp_path)
start = _ts(2023, 2, 28, 0, 0)
after_two_years = _ts(2025, 3, 1, 0, 0)
next_run = scheduler._compute_next_run("yearly", start, start, after_two_years)
assert next_run == _ts(2024, 2, 28, 0, 0)

View File

@@ -15,10 +15,7 @@ import time
from pathlib import Path
from typing import Optional, Sequence
try: # pragma: no cover - legacy dependency shim
from Modules import db_migrations # type: ignore
except Exception: # pragma: no cover - absent in some test contexts
db_migrations = None # type: ignore
from . import database_migrations
_DEFAULT_ADMIN_HASH = (
@@ -44,7 +41,7 @@ def initialise_engine_database(database_path: str, *, logger: Optional[logging.L
conn = sqlite3.connect(str(path))
try:
_apply_legacy_migrations(conn, logger=logger)
_apply_engine_migrations(conn, logger=logger)
_restore_persisted_enrollment_codes(conn, logger=logger)
_ensure_activity_history(conn, logger=logger)
_ensure_device_list_views(conn, logger=logger)
@@ -66,16 +63,12 @@ def initialise_engine_database(database_path: str, *, logger: Optional[logging.L
conn.close()
def _apply_legacy_migrations(conn: sqlite3.Connection, *, logger: Optional[logging.Logger]) -> None:
if db_migrations is None:
if logger:
logger.debug("Legacy db_migrations module not available; skipping schema sync.")
return
def _apply_engine_migrations(conn: sqlite3.Connection, *, logger: Optional[logging.Logger]) -> None:
try:
db_migrations.apply_all(conn)
database_migrations.apply_all(conn)
except Exception as exc:
if logger:
logger.error("Legacy schema migration failed: %s", exc, exc_info=True)
logger.error("Engine schema migration failed: %s", exc, exc_info=True)
else: # pragma: no cover - escalated in tests if logger absent
raise

View File

@@ -0,0 +1,487 @@
# ======================================================
# Data\Engine\database_migrations.py
# Description: Provides schema evolution helpers for the Engine sqlite
# database without importing the legacy ``Modules`` package.
# ======================================================
"""Engine database schema migration helpers."""
from __future__ import annotations
import sqlite3
import uuid
from datetime import datetime, timezone
from typing import List, Optional, Sequence, Tuple
DEVICE_TABLE = "devices"
def apply_all(conn: sqlite3.Connection) -> None:
"""
Run all known schema migrations against the provided sqlite3 connection.
"""
_ensure_devices_table(conn)
_ensure_device_aux_tables(conn)
_ensure_refresh_token_table(conn)
_ensure_install_code_table(conn)
_ensure_install_code_persistence_table(conn)
_ensure_device_approval_table(conn)
conn.commit()
def _ensure_devices_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
if not _table_exists(cur, DEVICE_TABLE):
_create_devices_table(cur)
return
column_info = _table_info(cur, DEVICE_TABLE)
col_names = [c[1] for c in column_info]
pk_cols = [c[1] for c in column_info if c[5]]
needs_rebuild = pk_cols != ["guid"]
required_columns = {
"guid": "TEXT",
"hostname": "TEXT",
"description": "TEXT",
"created_at": "INTEGER",
"agent_hash": "TEXT",
"memory": "TEXT",
"network": "TEXT",
"software": "TEXT",
"storage": "TEXT",
"cpu": "TEXT",
"device_type": "TEXT",
"domain": "TEXT",
"external_ip": "TEXT",
"internal_ip": "TEXT",
"last_reboot": "TEXT",
"last_seen": "INTEGER",
"last_user": "TEXT",
"operating_system": "TEXT",
"uptime": "INTEGER",
"agent_id": "TEXT",
"ansible_ee_ver": "TEXT",
"connection_type": "TEXT",
"connection_endpoint": "TEXT",
"ssl_key_fingerprint": "TEXT",
"token_version": "INTEGER",
"status": "TEXT",
"key_added_at": "TEXT",
}
missing_columns = [col for col in required_columns if col not in col_names]
if missing_columns:
needs_rebuild = True
if needs_rebuild:
_rebuild_devices_table(conn, column_info)
else:
_ensure_column_defaults(cur)
_ensure_device_indexes(cur)
def _ensure_device_aux_tables(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS device_keys (
id TEXT PRIMARY KEY,
guid TEXT NOT NULL,
ssl_key_fingerprint TEXT NOT NULL,
added_at TEXT NOT NULL,
retired_at TEXT
)
"""
)
cur.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_device_keys_guid_fingerprint
ON device_keys(guid, ssl_key_fingerprint)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_device_keys_guid
ON device_keys(guid)
"""
)
def _ensure_refresh_token_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS refresh_tokens (
id TEXT PRIMARY KEY,
guid TEXT NOT NULL,
token_hash TEXT NOT NULL,
dpop_jkt TEXT,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
revoked_at TEXT,
last_used_at TEXT
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_refresh_tokens_guid
ON refresh_tokens(guid)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_refresh_tokens_expires_at
ON refresh_tokens(expires_at)
"""
)
def _ensure_install_code_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS enrollment_install_codes (
id TEXT PRIMARY KEY,
code TEXT NOT NULL UNIQUE,
expires_at TEXT NOT NULL,
created_by_user_id TEXT,
used_at TEXT,
used_by_guid TEXT,
max_uses INTEGER NOT NULL DEFAULT 1,
use_count INTEGER NOT NULL DEFAULT 0,
last_used_at TEXT
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_eic_expires_at
ON enrollment_install_codes(expires_at)
"""
)
columns = {row[1] for row in _table_info(cur, "enrollment_install_codes")}
if "max_uses" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes
ADD COLUMN max_uses INTEGER NOT NULL DEFAULT 1
"""
)
if "use_count" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes
ADD COLUMN use_count INTEGER NOT NULL DEFAULT 0
"""
)
if "last_used_at" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes
ADD COLUMN last_used_at TEXT
"""
)
def _ensure_install_code_persistence_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS enrollment_install_codes_persistent (
id TEXT PRIMARY KEY,
code TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
created_by_user_id TEXT,
used_at TEXT,
used_by_guid TEXT,
max_uses INTEGER NOT NULL DEFAULT 1,
last_known_use_count INTEGER NOT NULL DEFAULT 0,
last_used_at TEXT,
is_active INTEGER NOT NULL DEFAULT 1,
archived_at TEXT,
consumed_at TEXT
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_eicp_active
ON enrollment_install_codes_persistent(is_active, expires_at)
"""
)
cur.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_eicp_code
ON enrollment_install_codes_persistent(code)
"""
)
columns = {row[1] for row in _table_info(cur, "enrollment_install_codes_persistent")}
if "last_known_use_count" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN last_known_use_count INTEGER NOT NULL DEFAULT 0
"""
)
if "archived_at" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN archived_at TEXT
"""
)
if "consumed_at" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN consumed_at TEXT
"""
)
if "is_active" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN is_active INTEGER NOT NULL DEFAULT 1
"""
)
if "used_at" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN used_at TEXT
"""
)
if "used_by_guid" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN used_by_guid TEXT
"""
)
if "last_used_at" not in columns:
cur.execute(
"""
ALTER TABLE enrollment_install_codes_persistent
ADD COLUMN last_used_at TEXT
"""
)
def _ensure_device_approval_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS device_approvals (
id TEXT PRIMARY KEY,
approval_reference TEXT NOT NULL UNIQUE,
guid TEXT,
hostname_claimed TEXT NOT NULL,
ssl_key_fingerprint_claimed TEXT NOT NULL,
enrollment_code_id TEXT NOT NULL,
status TEXT NOT NULL,
client_nonce TEXT NOT NULL,
server_nonce TEXT NOT NULL,
agent_pubkey_der BLOB NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
approved_by_user_id TEXT
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_da_status
ON device_approvals(status)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_da_fp_status
ON device_approvals(ssl_key_fingerprint_claimed, status)
"""
)
def _create_devices_table(cur: sqlite3.Cursor) -> None:
cur.execute(
"""
CREATE TABLE devices (
guid TEXT PRIMARY KEY,
hostname TEXT,
description TEXT,
created_at INTEGER,
agent_hash TEXT,
memory TEXT,
network TEXT,
software TEXT,
storage TEXT,
cpu TEXT,
device_type TEXT,
domain TEXT,
external_ip TEXT,
internal_ip TEXT,
last_reboot TEXT,
last_seen INTEGER,
last_user TEXT,
operating_system TEXT,
uptime INTEGER,
agent_id TEXT,
ansible_ee_ver TEXT,
connection_type TEXT,
connection_endpoint TEXT,
ssl_key_fingerprint TEXT,
token_version INTEGER DEFAULT 1,
status TEXT DEFAULT 'active',
key_added_at TEXT
)
"""
)
_ensure_device_indexes(cur)
def _ensure_device_indexes(cur: sqlite3.Cursor) -> None:
cur.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS uq_devices_hostname
ON devices(hostname)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_devices_ssl_key
ON devices(ssl_key_fingerprint)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_devices_status
ON devices(status)
"""
)
def _ensure_column_defaults(cur: sqlite3.Cursor) -> None:
cur.execute(
"""
UPDATE devices
SET token_version = COALESCE(token_version, 1)
WHERE token_version IS NULL
"""
)
cur.execute(
"""
UPDATE devices
SET status = COALESCE(status, 'active')
WHERE status IS NULL OR status = ''
"""
)
def _rebuild_devices_table(conn: sqlite3.Connection, column_info: Sequence[Tuple]) -> None:
cur = conn.cursor()
cur.execute("PRAGMA foreign_keys=OFF")
cur.execute("BEGIN IMMEDIATE")
cur.execute("ALTER TABLE devices RENAME TO devices_legacy")
_create_devices_table(cur)
legacy_columns = [c[1] for c in column_info]
cur.execute(f"SELECT {', '.join(legacy_columns)} FROM devices_legacy")
rows = cur.fetchall()
insert_sql = (
"""
INSERT OR REPLACE INTO devices (
guid, hostname, description, created_at, agent_hash, memory,
network, software, storage, cpu, device_type, domain, external_ip,
internal_ip, last_reboot, last_seen, last_user, operating_system,
uptime, agent_id, ansible_ee_ver, connection_type, connection_endpoint,
ssl_key_fingerprint, token_version, status, key_added_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
)
for row in rows:
record = dict(zip(legacy_columns, row))
guid = _normalized_guid(record.get("guid"))
if not guid:
guid = str(uuid.uuid4())
hostname = record.get("hostname")
created_at = record.get("created_at")
key_added_at = record.get("key_added_at")
if key_added_at is None:
key_added_at = _default_key_added_at(created_at)
params: Tuple = (
guid,
hostname,
record.get("description"),
created_at,
record.get("agent_hash"),
record.get("memory"),
record.get("network"),
record.get("software"),
record.get("storage"),
record.get("cpu"),
record.get("device_type"),
record.get("domain"),
record.get("external_ip"),
record.get("internal_ip"),
record.get("last_reboot"),
record.get("last_seen"),
record.get("last_user"),
record.get("operating_system"),
record.get("uptime"),
record.get("agent_id"),
record.get("ansible_ee_ver"),
record.get("connection_type"),
record.get("connection_endpoint"),
record.get("ssl_key_fingerprint"),
record.get("token_version") or 1,
record.get("status") or "active",
key_added_at,
)
cur.execute(insert_sql, params)
cur.execute("DROP TABLE devices_legacy")
cur.execute("COMMIT")
cur.execute("PRAGMA foreign_keys=ON")
def _default_key_added_at(created_at: Optional[int]) -> Optional[str]:
if created_at:
try:
dt = datetime.fromtimestamp(int(created_at), tz=timezone.utc)
return dt.isoformat()
except Exception:
pass
return datetime.now(tz=timezone.utc).isoformat()
def _table_exists(cur: sqlite3.Cursor, name: str) -> bool:
cur.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
(name,),
)
return cur.fetchone() is not None
def _table_info(cur: sqlite3.Cursor, name: str) -> List[Tuple]:
cur.execute(f"PRAGMA table_info({name})")
return cur.fetchall()
def _normalized_guid(value: Optional[str]) -> str:
if not value:
return ""
return str(value).strip()

View File

@@ -20,10 +20,8 @@ import importlib.util
import logging
import time
import ssl
import sys
from dataclasses import dataclass
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from typing import Any, Mapping, Optional, Sequence, Tuple
@@ -99,21 +97,6 @@ if HttpProtocol is not None: # pragma: no branch - attribute exists in supporte
_SOCKETIO_ASYNC_MODE = "eventlet"
# Ensure the legacy ``Modules`` package is importable when running from the
# Engine deployment directory.
_ENGINE_DIR = Path(__file__).resolve().parent
_SEARCH_ROOTS = [
_ENGINE_DIR.parent / "Server",
_ENGINE_DIR.parent.parent / "Data" / "Server",
_ENGINE_DIR.parent.parent.parent / "Data" / "Server",
]
for root in _SEARCH_ROOTS:
modules_dir = root / "Modules"
if modules_dir.is_dir():
root_str = str(root)
if root_str not in sys.path:
sys.path.insert(0, root_str)
from .config import EngineSettings, initialise_engine_logger, load_runtime_config

View File

@@ -615,7 +615,11 @@ class DeviceManagementService:
payload["agent_id"] = agent_key
agents[agent_key] = payload
return {"agents": agents}, 200
# The legacy server exposed /api/agents as a mapping keyed by
# agent identifier. The Engine WebUI expects the same structure,
# so we return the flattened dictionary directly instead of
# wrapping it in another object.
return agents, 200
except Exception as exc:
self.logger.debug("Failed to list agents", exc_info=True)
return {"error": str(exc)}, 500

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
# ======================================================
# Data\Engine\services\API\scheduled_jobs\management.py
# Description: Integrates the legacy job scheduler for CRUD operations within the Engine API.
# Description: Integrates the Engine job scheduler for CRUD operations within the Engine API.
#
# API Endpoints (if applicable):
# - GET /api/scheduled_jobs (Token Authenticated) - Lists scheduled jobs with summary metadata.
@@ -19,36 +19,18 @@ from __future__ import annotations
from typing import TYPE_CHECKING
try: # pragma: no cover - legacy module import guard
import job_scheduler as legacy_job_scheduler # type: ignore
except Exception as exc: # pragma: no cover - runtime guard
legacy_job_scheduler = None # type: ignore
_SCHEDULER_IMPORT_ERROR = exc
else:
_SCHEDULER_IMPORT_ERROR = None
from . import job_scheduler
if TYPE_CHECKING: # pragma: no cover - typing aide
from flask import Flask
from .. import EngineServiceAdapters
def _raise_scheduler_import() -> None:
if _SCHEDULER_IMPORT_ERROR is not None:
raise RuntimeError(
"Legacy job scheduler module could not be imported; ensure Data/Server/job_scheduler.py "
"remains available during the Engine migration."
) from _SCHEDULER_IMPORT_ERROR
def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"):
"""Instantiate the legacy job scheduler and attach it to the Engine context."""
"""Instantiate the Engine job scheduler and attach it to the Engine context."""
if getattr(adapters.context, "scheduler", None) is not None:
return adapters.context.scheduler
_raise_scheduler_import()
socketio = getattr(adapters.context, "socketio", None)
if socketio is None:
raise RuntimeError("Socket.IO instance is required to initialise the scheduled job service.")
@@ -56,7 +38,7 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"):
database_path = adapters.context.database_path
script_signer = adapters.script_signer
scheduler = legacy_job_scheduler.register(
scheduler = job_scheduler.register(
app,
socketio,
database_path,
@@ -64,7 +46,7 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"):
)
scheduler.start()
adapters.context.scheduler = scheduler
adapters.service_log("scheduled_jobs", "legacy scheduler initialised", level="INFO")
adapters.service_log("scheduled_jobs", "engine scheduler initialised", level="INFO")
return scheduler
@@ -76,7 +58,7 @@ def get_scheduler(adapters: "EngineServiceAdapters"):
def register_management(app: "Flask", adapters: "EngineServiceAdapters") -> None:
"""Ensure scheduled job routes are registered via the legacy scheduler."""
"""Ensure scheduled job routes are registered via the Engine scheduler."""
ensure_scheduler(app, adapters)