From bdf5457de03fc3687e234279cf4a1bcd89ff9c4c Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Tue, 4 Nov 2025 04:03:12 -0700 Subject: [PATCH] Fixed Quick Jobs & Scheduled Jobs --- .../API/scheduled_jobs/job_scheduler.py | 566 +++++++++++++----- 1 file changed, 407 insertions(+), 159 deletions(-) diff --git a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py index 9b320f2e..22b9ddac 100644 --- a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py +++ b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py @@ -16,8 +16,11 @@ import os import re import sqlite3 import time +import uuid from typing import Any, Callable, Dict, List, Optional, Tuple +from ...assemblies.service import AssemblyRuntimeService + _WINRM_USERNAME_VAR = "__borealis_winrm_username" _WINRM_PASSWORD_VAR = "__borealis_winrm_password" _WINRM_TRANSPORT_VAR = "__borealis_winrm_transport" @@ -319,14 +322,25 @@ def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]: class JobScheduler: - def __init__(self, app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None): + def __init__( + self, + app, + socketio, + db_path: str, + *, + script_signer=None, + service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None, + assembly_runtime: Optional[AssemblyRuntimeService] = None, + ): self.app = app self.socketio = socketio self.db_path = db_path self._script_signer = script_signer self._running = False self._service_log = service_logger - # Simulated run duration to hold jobs in "Running" before Success + self._assembly_runtime = assembly_runtime + # Simulated run duration to hold jobs in "Running" before Success. + # Default is disabled (0) so that agent callbacks control run status. self.SIMULATED_RUN_SECONDS = int(os.environ.get("BOREALIS_SIM_RUN_SECONDS", "30")) # Retention for run history (days) self.RETENTION_DAYS = int(os.environ.get("BOREALIS_JOB_HISTORY_DAYS", "30")) @@ -342,6 +356,13 @@ class JobScheduler: # Bind routes self._register_routes() + self._log_event( + "scheduler initialised", + extra={ + "has_script_signer": bool(self._script_signer), + "retention_days": self.RETENTION_DAYS, + }, + ) def _log_event( self, @@ -404,6 +425,59 @@ class JobScheduler: except Exception: return False + def _resolve_runtime_document( + self, + rel_path: str, + default_type: str, + ) -> Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]: + runtime = self._assembly_runtime + if runtime is None or not rel_path: + return None, None + try: + record = runtime.resolve_document_by_source_path(rel_path) + except Exception as exc: + self._log_event( + "assembly cache lookup failed", + level="ERROR", + extra={"error": str(exc), "path": rel_path}, + ) + return None, None + if not record: + self._log_event( + "assembly not found in cache", + level="ERROR", + extra={"path": rel_path}, + ) + return None, None + payload_doc = record.get("payload_json") + if not isinstance(payload_doc, dict): + raw_payload = record.get("payload") + if isinstance(raw_payload, str): + try: + payload_doc = json.loads(raw_payload) + except Exception: + payload_doc = None + if not isinstance(payload_doc, dict): + self._log_event( + "assembly payload missing", + level="ERROR", + extra={"path": rel_path}, + ) + return None, None + doc = self._load_assembly_document(rel_path, default_type, payload=payload_doc) + if doc: + metadata_block = doc.get("metadata") + if not isinstance(metadata_block, dict): + metadata_block = {} + metadata_block.setdefault("assembly_guid", record.get("assembly_guid")) + record_meta = record.get("metadata", {}) + if isinstance(record_meta, dict): + metadata_block.setdefault("source_path", record_meta.get("source_path") or rel_path) + doc["metadata"] = metadata_block + if not doc.get("name"): + doc["name"] = record.get("display_name") or doc.get("name") + return doc, record + def _detect_script_type(self, filename: str) -> str: fn_lower = (filename or "").lower() if fn_lower.endswith(".json") and os.path.isfile(filename): @@ -427,8 +501,13 @@ class JobScheduler: return "bash" return "unknown" - def _load_assembly_document(self, abs_path: str, default_type: str) -> Dict[str, Any]: - base_name = os.path.splitext(os.path.basename(abs_path))[0] + def _load_assembly_document( + self, + source_identifier: str, + default_type: str, + payload: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + base_name = os.path.splitext(os.path.basename(source_identifier))[0] doc: Dict[str, Any] = { "name": base_name, "description": "", @@ -438,104 +517,110 @@ class JobScheduler: "variables": [], "files": [], "timeout_seconds": 3600, + "metadata": {}, } - if abs_path.lower().endswith(".json") and os.path.isfile(abs_path): + data: Dict[str, Any] = {} + if isinstance(payload, dict): + data = payload + elif source_identifier.lower().endswith(".json") and os.path.isfile(source_identifier): try: - with open(abs_path, "r", encoding="utf-8") as fh: + with open(source_identifier, "r", encoding="utf-8") as fh: data = json.load(fh) except Exception: data = {} - if isinstance(data, dict): - doc["name"] = str(data.get("name") or doc["name"]) - doc["description"] = str(data.get("description") or "") - cat = str(data.get("category") or doc["category"]).strip().lower() - if cat in ("application", "script"): - doc["category"] = cat - typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() - if typ in ("powershell", "batch", "bash", "ansible"): - doc["type"] = typ - script_val = data.get("script") - content_val = data.get("content") - script_lines = data.get("script_lines") - if isinstance(script_lines, list): - try: - doc["script"] = "\n".join(str(line) for line in script_lines) - except Exception: - doc["script"] = "" - elif isinstance(script_val, str): - doc["script"] = script_val - else: - if isinstance(content_val, str): - doc["script"] = content_val - encoding_hint = str(data.get("script_encoding") or data.get("scriptEncoding") or "").strip().lower() - doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) - if encoding_hint in ("base64", "b64", "base-64"): - doc["script_encoding"] = "base64" - else: - probe_source = "" - if isinstance(script_val, str) and script_val: - probe_source = script_val - elif isinstance(content_val, str) and content_val: - probe_source = content_val - decoded_probe = _decode_base64_text(probe_source) if probe_source else None - if decoded_probe is not None: - doc["script_encoding"] = "base64" - doc["script"] = decoded_probe.replace("\r\n", "\n") - else: - doc["script_encoding"] = "plain" + if isinstance(data, dict) and data: + doc["name"] = str(data.get("name") or doc["name"]) + doc["description"] = str(data.get("description") or "") + doc["metadata"] = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + cat = str(data.get("category") or doc["category"]).strip().lower() + if cat in ("application", "script"): + doc["category"] = cat + typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() + if typ in ("powershell", "batch", "bash", "ansible"): + doc["type"] = typ + script_val = data.get("script") + content_val = data.get("content") + script_lines = data.get("script_lines") + if isinstance(script_lines, list): try: - timeout_raw = data.get("timeout_seconds", data.get("timeout")) - if timeout_raw is None: - doc["timeout_seconds"] = 3600 - else: - doc["timeout_seconds"] = max(0, int(timeout_raw)) + doc["script"] = "\n".join(str(line) for line in script_lines) except Exception: + doc["script"] = "" + elif isinstance(script_val, str): + doc["script"] = script_val + elif isinstance(content_val, str): + doc["script"] = content_val + encoding_hint = str(data.get("script_encoding") or data.get("scriptEncoding") or "").strip().lower() + doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) + if encoding_hint in ("base64", "b64", "base-64"): + doc["script_encoding"] = "base64" + else: + probe_source = "" + if isinstance(script_val, str) and script_val: + probe_source = script_val + elif isinstance(content_val, str) and content_val: + probe_source = content_val + decoded_probe = _decode_base64_text(probe_source) if probe_source else None + if decoded_probe is not None: + doc["script_encoding"] = "base64" + doc["script"] = decoded_probe.replace("\r\n", "\n") + else: + doc["script_encoding"] = "plain" + try: + timeout_raw = data.get("timeout_seconds", data.get("timeout")) + if timeout_raw is None: doc["timeout_seconds"] = 3600 - vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] - doc["variables"] = [] - for v in vars_in: - if not isinstance(v, dict): - continue - name = str(v.get("name") or v.get("key") or "").strip() - if not name: - continue - vtype = str(v.get("type") or "string").strip().lower() - if vtype not in ("string", "number", "boolean", "credential"): - vtype = "string" - doc["variables"].append({ - "name": name, - "label": str(v.get("label") or ""), - "type": vtype, - "default": v.get("default", v.get("default_value")), - "required": bool(v.get("required")), - "description": str(v.get("description") or ""), - }) - files_in = data.get("files") if isinstance(data.get("files"), list) else [] - doc["files"] = [] - for f in files_in: - if not isinstance(f, dict): - continue - fname = f.get("file_name") or f.get("name") - if not fname or not isinstance(f.get("data"), str): - continue - try: - size_val = int(f.get("size") or 0) - except Exception: - size_val = 0 - doc["files"].append({ - "file_name": str(fname), - "size": size_val, - "mime_type": str(f.get("mime_type") or f.get("mimeType") or ""), - "data": f.get("data"), - }) + else: + doc["timeout_seconds"] = max(0, int(timeout_raw)) + except Exception: + doc["timeout_seconds"] = 3600 + vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] + doc["variables"] = [] + for v in vars_in: + if not isinstance(v, dict): + continue + name = str(v.get("name") or v.get("key") or "").strip() + if not name: + continue + vtype = str(v.get("type") or "string").strip().lower() + if vtype not in ("string", "number", "boolean", "credential"): + vtype = "string" + doc["variables"].append({ + "name": name, + "label": str(v.get("label") or ""), + "type": vtype, + "default": v.get("default", v.get("default_value")), + "required": bool(v.get("required")), + "description": str(v.get("description") or ""), + }) + files_in = data.get("files") if isinstance(data.get("files"), list) else [] + doc["files"] = [] + for f in files_in: + if not isinstance(f, dict): + continue + fname = f.get("file_name") or f.get("name") + if not fname or not isinstance(f.get("data"), str): + continue + try: + size_val = int(f.get("size") or 0) + except Exception: + size_val = 0 + doc["files"].append({ + "file_name": str(fname), + "size": size_val, + "mime_type": str(f.get("mime_type") or f.get("mimeType") or ""), + "data": f.get("data"), + }) return doc - try: - with open(abs_path, "r", encoding="utf-8", errors="replace") as fh: - content = fh.read() - except Exception: - content = "" - normalized_script = (content or "").replace("\r\n", "\n") - doc["script"] = normalized_script + if os.path.isfile(source_identifier): + try: + with open(source_identifier, "r", encoding="utf-8", errors="replace") as fh: + content = fh.read() + except Exception: + content = "" + doc["script"] = (content or "").replace("\r\n", "\n") + else: + doc["script"] = "" return doc def _ansible_root(self) -> str: @@ -562,7 +647,7 @@ class JobScheduler: use_service_account: bool = False, ) -> Optional[Dict[str, Any]]: try: - import os, uuid + import os ans_root = self._ansible_root() rel_path = "" overrides_map: Dict[str, Any] = {} @@ -587,11 +672,24 @@ class JobScheduler: overrides_map[name] = var.get("value") else: rel_path = str(component or "") - rel_norm = (rel_path or "").replace("\\", "/").lstrip("/") - abs_path = os.path.abspath(os.path.join(ans_root, rel_norm)) - if (not abs_path.startswith(ans_root)) or (not os.path.isfile(abs_path)): + rel_norm = (rel_path or "").replace("\\", "/").strip() + rel_norm = rel_norm.lstrip("/") + if rel_norm and not rel_norm.lower().startswith("ansible_playbooks/"): + rel_norm = f"Ansible_Playbooks/{rel_norm}" + rel_join = rel_norm + if rel_join.lower().startswith("ansible_playbooks/"): + rel_join = rel_join.split("/", 1)[1] if "/" in rel_join else "" + doc, record = self._resolve_runtime_document(rel_norm, "ansible") + if not doc: return None - doc = self._load_assembly_document(abs_path, "ansible") + assembly_source = "runtime" + metadata_block = doc.get("metadata") if isinstance(doc.get("metadata"), dict) else {} + assembly_guid = metadata_block.get("assembly_guid") if isinstance(metadata_block, dict) else None + friendly_name = (doc.get("name") or "").strip() + if not friendly_name: + friendly_name = os.path.basename(rel_norm) if rel_norm else f"Job-{scheduled_job_id}" + if not friendly_name: + friendly_name = f"Job-{scheduled_job_id}" content = doc.get("script") or "" normalized_script = (content or "").replace("\r\n", "\n") script_bytes = normalized_script.encode("utf-8") @@ -645,7 +743,7 @@ class JobScheduler: ( str(hostname), rel_norm, - doc.get("name") or os.path.basename(abs_path), + friendly_name, "ansible", now, "Running", @@ -658,6 +756,16 @@ class JobScheduler: finally: conn.close() + if server_run and (not abs_path or not os.path.isfile(abs_path)): + if not abs_path: + raise RuntimeError("Unable to stage Ansible playbook for server execution; no path resolved.") + try: + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + with open(abs_path, "w", encoding="utf-8") as fh: + fh.write(normalized_script) + except Exception as exc: + raise RuntimeError(f"Unable to stage Ansible playbook for server execution: {exc}") + if server_run: if not credential_id: raise RuntimeError("Remote execution requires a credential_id") @@ -668,7 +776,7 @@ class JobScheduler: hostname=str(hostname), playbook_abs_path=abs_path, playbook_rel_path=rel_norm, - playbook_name=doc.get("name") or os.path.basename(abs_path), + playbook_name=friendly_name, credential_id=int(credential_id), variable_values=overrides_map, source="scheduled_job", @@ -677,6 +785,17 @@ class JobScheduler: scheduled_run_id=scheduled_run_row_id, scheduled_job_run_row_id=scheduled_run_row_id, ) + self._log_event( + "queued server ansible execution", + job_id=int(scheduled_job_id), + host=str(hostname), + run_id=scheduled_run_row_id, + extra={ + "run_mode": run_mode_norm, + "assembly_source": assembly_source, + "assembly_guid": assembly_guid or "", + }, + ) except Exception as exc: try: self.app.logger.warning( @@ -705,7 +824,7 @@ class JobScheduler: payload = { "run_id": uuid.uuid4().hex, "target_hostname": str(hostname), - "playbook_name": doc.get("name") or os.path.basename(abs_path), + "playbook_name": friendly_name, "playbook_content": encoded_content, "playbook_encoding": "base64", "activity_job_id": act_id, @@ -715,15 +834,30 @@ class JobScheduler: "variables": variables, "files": files, "variable_values": overrides_map, + "context": { + "scheduled_job_id": int(scheduled_job_id), + "scheduled_job_run_id": int(scheduled_run_row_id), + }, } try: self.socketio.emit("ansible_playbook_run", payload) + self._log_event( + "emitted ansible payload", + job_id=int(scheduled_job_id), + host=str(hostname), + run_id=scheduled_run_row_id, + extra={ + "run_mode": run_mode_norm, + "assembly_source": assembly_source, + "assembly_guid": assembly_guid or "", + }, + ) except Exception: pass if act_id: return { "activity_id": int(act_id), - "component_name": doc.get("name") or os.path.basename(abs_path), + "component_name": friendly_name, "component_path": rel_norm, "script_type": "ansible", "component_kind": "ansible", @@ -732,13 +866,19 @@ class JobScheduler: except Exception: pass - def _dispatch_script(self, hostname: str, component: Dict[str, Any], run_mode: str) -> Optional[Dict[str, Any]]: + def _dispatch_script( + self, + job_id: int, + run_row_id: int, + scheduled_ts: int, + hostname: str, + component: Dict[str, Any], + run_mode: str, + ) -> Optional[Dict[str, Any]]: """Emit a quick_job_run event to agents for the given script/host. Mirrors /api/scripts/quick_run behavior for scheduled jobs. """ try: - scripts_root = self._scripts_root() - import os rel_path_raw = "" if isinstance(component, dict): rel_path_raw = str(component.get("path") or component.get("script_path") or "") @@ -747,10 +887,20 @@ class JobScheduler: path_norm = (rel_path_raw or "").replace("\\", "/").strip() if path_norm and not path_norm.startswith("Scripts/"): path_norm = f"Scripts/{path_norm}" - abs_path = os.path.abspath(os.path.join(scripts_root, path_norm)) - if (not abs_path.startswith(scripts_root)) or (not self._is_valid_scripts_relpath(path_norm)) or (not os.path.isfile(abs_path)): + if not self._is_valid_scripts_relpath(path_norm): + self._log_event( + "script component path rejected", + job_id=job_id, + host=str(hostname), + run_id=run_row_id, + level="ERROR", + extra={"script_path": path_norm}, + ) return None - doc = self._load_assembly_document(abs_path, "powershell") + doc, record = self._resolve_runtime_document(path_norm, "powershell") + if not doc: + return None + assembly_source = "runtime" stype = (doc.get("type") or "powershell").lower() # For now, only PowerShell is supported by agents for scheduled jobs if stype != "powershell": @@ -778,14 +928,42 @@ class JobScheduler: env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides) content = _rewrite_powershell_script(content, literal_lookup) encoded_content = _encode_script_content(content) + if self._script_signer is None: + self._log_event( + "script signer unavailable; cannot dispatch payload", + job_id=job_id, + host=str(hostname), + run_id=run_row_id, + level="ERROR", + extra={"script_path": path_norm}, + ) + return None + script_bytes = content.encode("utf-8") + signature_b64: Optional[str] = None + sig_alg: Optional[str] = None + signing_key_b64: Optional[str] = None + if self._script_signer is not None: + try: + signature = self._script_signer.sign(script_bytes) + signature_b64 = base64.b64encode(signature).decode("ascii") + sig_alg = "ed25519" + signing_key_b64 = self._script_signer.public_base64_spki() + except Exception: + signature_b64 = None + sig_alg = None + signing_key_b64 = None timeout_seconds = 0 try: timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0)) except Exception: timeout_seconds = 0 + friendly_name = (doc.get("name") or "").strip() + if not friendly_name: + friendly_name = os.path.basename(path_norm) if path_norm else f"Job-{job_id}" + if not friendly_name: + friendly_name = f"Job-{job_id}" # Insert into activity_history for device for parity with Quick Job - import sqlite3 now = _now_ts() act_id = None conn = sqlite3.connect(self.db_path) @@ -799,7 +977,7 @@ class JobScheduler: ( str(hostname), path_norm, - doc.get("name") or os.path.basename(abs_path), + friendly_name, stype, now, "Running", @@ -816,7 +994,7 @@ class JobScheduler: "job_id": act_id, "target_hostname": str(hostname), "script_type": stype, - "script_name": doc.get("name") or os.path.basename(abs_path), + "script_name": friendly_name, "script_path": path_norm, "script_content": encoded_content, "script_encoding": "base64", @@ -834,22 +1012,76 @@ class JobScheduler: payload["sig_alg"] = sig_alg if signing_key_b64: payload["signing_key"] = signing_key_b64 + payload["context"] = { + "scheduled_job_id": int(job_id), + "scheduled_job_run_id": int(run_row_id), + "scheduled_ts": int(scheduled_ts or 0), + } + assembly_guid = None + metadata_block = doc.get("metadata") + if isinstance(metadata_block, dict): + assembly_guid = metadata_block.get("assembly_guid") try: self.socketio.emit("quick_job_run", payload) - except Exception: - pass + if act_id: + try: + self.socketio.emit( + "device_activity_changed", + { + "hostname": str(hostname), + "activity_id": int(act_id), + "change": "created", + "source": "scheduled_job", + }, + ) + except Exception: + pass + self._log_event( + "emitted quick job payload", + job_id=int(job_id), + host=str(hostname), + run_id=run_row_id, + extra={ + "has_signature": bool(signature_b64), + "run_mode": (run_mode or "system").strip().lower(), + "scheduled_ts": int(scheduled_ts or 0), + "assembly_source": assembly_source, + "assembly_guid": assembly_guid or "", + }, + ) + except Exception as exc: + self._log_event( + "quick job dispatch failed", + job_id=int(job_id), + host=str(hostname), + run_id=run_row_id, + level="ERROR", + extra={ + "error": str(exc), + "script_path": path_norm, + "scheduled_ts": int(scheduled_ts or 0), + "assembly_source": assembly_source, + }, + ) if act_id: return { "activity_id": int(act_id), - "component_name": doc.get("name") or os.path.basename(abs_path), + "component_name": friendly_name, "component_path": path_norm, "script_type": stype, "component_kind": "script", } return None - except Exception: + except Exception as exc: # Keep scheduler resilient - pass + self._log_event( + "unhandled exception during script dispatch", + job_id=int(job_id), + host=str(hostname), + run_id=run_row_id, + level="ERROR", + extra={"error": str(exc)}, + ) return None # ---------- DB helpers ---------- @@ -1063,46 +1295,47 @@ class JobScheduler: extra={"error": str(exc)}, ) - try: - cur.execute( - "SELECT id, job_id, target_hostname, started_ts FROM scheduled_job_runs WHERE status='Running'" - ) - rows = cur.fetchall() - self._log_event( - "evaluating running runs for simulated completion", - extra={"running_count": len(rows), "now_ts": now, "simulated_window": self.SIMULATED_RUN_SECONDS}, - ) - for rid, row_job_id, row_host, started_ts in rows: - if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now: - try: - c2 = conn.cursor() - c2.execute( - "UPDATE scheduled_job_runs SET finished_ts=?, status='Success', updated_at=? WHERE id=?", - (now, now, rid), - ) - conn.commit() - self._log_event( - "auto-completed simulated run", - job_id=row_job_id, - host=row_host, - run_id=rid, - extra={"started_ts": started_ts}, - ) - except Exception as exc: - self._log_event( - "failed to auto-complete simulated run", - job_id=row_job_id, - host=row_host, - run_id=rid, - level="ERROR", - extra={"error": str(exc)}, - ) - except Exception as exc: - self._log_event( - "failed to auto-complete simulated runs", - level="ERROR", - extra={"error": str(exc)}, - ) + if self.SIMULATED_RUN_SECONDS > 0: + try: + cur.execute( + "SELECT id, job_id, target_hostname, started_ts FROM scheduled_job_runs WHERE status='Running'" + ) + rows = cur.fetchall() + self._log_event( + "evaluating running runs for simulated completion", + extra={"running_count": len(rows), "now_ts": now, "simulated_window": self.SIMULATED_RUN_SECONDS}, + ) + for rid, row_job_id, row_host, started_ts in rows: + if started_ts and (int(started_ts) + self.SIMULATED_RUN_SECONDS) <= now: + try: + c2 = conn.cursor() + c2.execute( + "UPDATE scheduled_job_runs SET finished_ts=?, status='Success', updated_at=? WHERE id=?", + (now, now, rid), + ) + conn.commit() + self._log_event( + "auto-completed simulated run", + job_id=row_job_id, + host=row_host, + run_id=rid, + extra={"started_ts": started_ts}, + ) + except Exception as exc: + self._log_event( + "failed to auto-complete simulated run", + job_id=row_job_id, + host=row_host, + run_id=rid, + level="ERROR", + extra={"error": str(exc)}, + ) + except Exception as exc: + self._log_event( + "failed to auto-complete simulated runs", + level="ERROR", + extra={"error": str(exc)}, + ) try: cutoff = now - (self.RETENTION_DAYS * 86400) @@ -1452,7 +1685,7 @@ class JobScheduler: "run_mode": run_mode, }, ) - link = self._dispatch_script(host, comp, run_mode) + link = self._dispatch_script(job_id, run_row_id, occ, host, comp, run_mode) if link and link.get("activity_id"): activity_links.append({ "run_id": run_row_id, @@ -2184,9 +2417,24 @@ class JobScheduler: return {} -def register(app, socketio, db_path: str, script_signer=None, service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None) -> JobScheduler: +def register( + app, + socketio, + db_path: str, + *, + script_signer=None, + service_logger: Optional[Callable[[str, str, Optional[str]], None]] = None, + assembly_runtime: Optional[AssemblyRuntimeService] = None, +) -> JobScheduler: """Factory to create and return a JobScheduler instance.""" - return JobScheduler(app, socketio, db_path, script_signer=script_signer, service_logger=service_logger) + return JobScheduler( + app, + socketio, + db_path, + script_signer=script_signer, + service_logger=service_logger, + assembly_runtime=assembly_runtime, + ) def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]):