Implemented Filter-Based Device Targeting for Scheduled Jobs

This commit is contained in:
2025-11-20 19:59:26 -07:00
parent cae497087a
commit 636e944162
5 changed files with 1087 additions and 89 deletions

View File

@@ -17,9 +17,10 @@ import re
import sqlite3
import time
import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
from ...assemblies.service import AssemblyRuntimeService
from ...filters.matcher import DeviceFilterMatcher
_WINRM_USERNAME_VAR = "__borealis_winrm_username"
_WINRM_PASSWORD_VAR = "__borealis_winrm_password"
@@ -335,6 +336,7 @@ class JobScheduler:
self.app = app
self.socketio = socketio
self.db_path = db_path
self._filter_matcher = DeviceFilterMatcher(db_path=db_path)
self._script_signer = script_signer
self._running = False
self._service_log = service_logger
@@ -364,6 +366,16 @@ class JobScheduler:
},
)
def _targets_include_filters(self, entries: Sequence[Any]) -> bool:
if not isinstance(entries, (list, tuple)):
return False
for entry in entries:
if isinstance(entry, dict):
kind = str(entry.get("kind") or entry.get("type") or "").strip().lower()
if kind == "filter" or entry.get("filter_id") is not None:
return True
return False
def _log_event(
self,
message: str,
@@ -1403,6 +1415,7 @@ class JobScheduler:
)
now_min = _now_minute()
device_inventory_cache: Optional[List[Dict[str, Any]]] = None
for (
job_id,
@@ -1418,11 +1431,36 @@ class JobScheduler:
) in jobs:
try:
try:
targets = json.loads(targets_json or "[]")
raw_targets = json.loads(targets_json or "[]")
except Exception as exc:
raw_targets = []
self._log_event(
"failed to parse targets JSON for job",
job_id=job_id,
level="ERROR",
extra={"error": str(exc)},
)
include_filters = self._targets_include_filters(raw_targets)
if include_filters and device_inventory_cache is None:
try:
device_inventory_cache = self._filter_matcher.fetch_devices()
except Exception as exc:
device_inventory_cache = []
self._log_event(
"failed to load device inventory for filter targets",
job_id=job_id,
level="ERROR",
extra={"error": str(exc)},
)
try:
targets, _meta = self._filter_matcher.resolve_target_entries(
raw_targets,
devices=device_inventory_cache if include_filters else None,
)
except Exception as exc:
targets = []
self._log_event(
"failed to parse targets JSON for job",
"failed to resolve job targets",
job_id=job_id,
level="ERROR",
extra={"error": str(exc)},
@@ -1995,6 +2033,68 @@ class JobScheduler:
base["next_run_ts"] = None
return base
def _normalize_targets_for_save(raw_targets: Any) -> List[Any]:
normalized: List[Any] = []
if not isinstance(raw_targets, list):
raw_list = [raw_targets]
else:
raw_list = raw_targets
seen_hosts: set[str] = set()
seen_filters: set[int] = set()
for entry in raw_list:
if isinstance(entry, str):
host = entry.strip()
if not host:
continue
lowered = host.lower()
if lowered in seen_hosts:
continue
seen_hosts.add(lowered)
normalized.append(host)
continue
if isinstance(entry, (int, float)):
host = str(entry).strip()
if not host:
continue
lowered = host.lower()
if lowered in seen_hosts:
continue
seen_hosts.add(lowered)
normalized.append(host)
continue
if not isinstance(entry, dict):
continue
kind = str(entry.get("kind") or entry.get("type") or "").strip().lower()
if kind == "filter" or entry.get("filter_id") is not None:
filter_id = entry.get("filter_id") or entry.get("id")
try:
filter_id_int = int(filter_id)
except (TypeError, ValueError):
continue
if filter_id_int in seen_filters:
continue
seen_filters.add(filter_id_int)
normalized.append(
{
"kind": "filter",
"filter_id": filter_id_int,
"name": entry.get("name"),
"site_scope": entry.get("site_scope") or entry.get("scope") or "global",
"site": entry.get("site") or entry.get("site_name"),
}
)
continue
hostname = entry.get("hostname")
if hostname:
host = str(hostname).strip()
if host:
lowered = host.lower()
if lowered in seen_hosts:
continue
seen_hosts.add(lowered)
normalized.append(host)
return normalized
@app.route("/api/scheduled_jobs", methods=["GET"])
def api_scheduled_jobs_list():
try:
@@ -2020,7 +2120,7 @@ class JobScheduler:
data = self._json_body()
name = (data.get("name") or "").strip()
components = data.get("components") or []
targets = data.get("targets") or []
targets = _normalize_targets_for_save(data.get("targets") or [])
schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower()
start = data.get("schedule", {}).get("start") or data.get("start") or None
start_ts = _parse_ts(start) if start else None
@@ -2109,7 +2209,10 @@ class JobScheduler:
if "components" in data:
fields["components_json"] = json.dumps(data.get("components") or [])
if "targets" in data:
fields["targets_json"] = json.dumps(data.get("targets") or [])
normalized_targets = _normalize_targets_for_save(data.get("targets") or [])
if not normalized_targets:
return json.dumps({"error": "targets required"}), 400, {"Content-Type": "application/json"}
fields["targets_json"] = json.dumps(normalized_targets)
if "schedule" in data or "schedule_type" in data:
schedule_type = (data.get("schedule", {}).get("type") or data.get("schedule_type") or "immediately").strip().lower()
fields["schedule_type"] = schedule_type
@@ -2259,10 +2362,19 @@ class JobScheduler:
conn.close()
return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"}
try:
targets = json.loads(row[0] or "[]")
raw_targets = json.loads(row[0] or "[]")
except Exception:
targets = []
targets = [str(t) for t in targets if isinstance(t, (str, int))]
raw_targets = []
try:
targets, target_meta = self._filter_matcher.resolve_target_entries(raw_targets)
except Exception as exc:
self._log_event(
"failed to resolve targets for devices endpoint",
job_id=job_id,
level="ERROR",
extra={"error": str(exc)},
)
targets = [str(t) for t in raw_targets if isinstance(t, (str, int))]
# Determine occurrence if not provided
if occ is None: