Fleshing-Out Implementation of Credential Management for Ansible Playbooks

This commit is contained in:
2025-10-11 02:14:56 -06:00
parent b07f52dbb5
commit 01202e8ac2
10 changed files with 2310 additions and 110 deletions

View File

@@ -309,6 +309,8 @@ class JobScheduler:
self.RETENTION_DAYS = int(os.environ.get("BOREALIS_JOB_HISTORY_DAYS", "30"))
# Callback to retrieve current set of online hostnames
self._online_lookup: Optional[Callable[[], List[str]]] = None
# Optional callback to execute Ansible directly from the server
self._server_ansible_runner: Optional[Callable[..., str]] = None
# Ensure run-history table exists
self._init_tables()
@@ -475,7 +477,15 @@ class JobScheduler:
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Ansible_Playbooks")
)
def _dispatch_ansible(self, hostname: str, component: Dict[str, Any], scheduled_job_id: int, scheduled_run_id: int) -> Optional[Dict[str, Any]]:
def _dispatch_ansible(
self,
hostname: str,
component: Dict[str, Any],
scheduled_job_id: int,
scheduled_run_row_id: int,
run_mode: str,
credential_id: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
try:
import os, uuid
ans_root = self._ansible_root()
@@ -511,6 +521,8 @@ class JobScheduler:
encoded_content = _encode_script_content(content)
variables = doc.get("variables") or []
files = doc.get("files") or []
run_mode_norm = (run_mode or "system").strip().lower()
server_run = run_mode_norm in ("ssh", "winrm")
# Record in activity_history for UI parity
now = _now_ts()
@@ -539,24 +551,68 @@ class JobScheduler:
finally:
conn.close()
payload = {
"run_id": uuid.uuid4().hex,
"target_hostname": str(hostname),
"playbook_name": doc.get("name") or os.path.basename(abs_path),
"playbook_content": encoded_content,
"playbook_encoding": "base64",
"activity_job_id": act_id,
"scheduled_job_id": int(scheduled_job_id),
"scheduled_run_id": int(scheduled_run_id),
"connection": "winrm",
"variables": variables,
"files": files,
"variable_values": overrides_map,
}
try:
self.socketio.emit("ansible_playbook_run", payload)
except Exception:
pass
if server_run:
if not credential_id:
raise RuntimeError("Remote execution requires a credential_id")
if not callable(self._server_ansible_runner):
raise RuntimeError("Server-side Ansible runner is not configured")
try:
self._server_ansible_runner(
hostname=str(hostname),
playbook_abs_path=abs_path,
playbook_rel_path=rel_norm,
playbook_name=doc.get("name") or os.path.basename(abs_path),
credential_id=int(credential_id),
variable_values=overrides_map,
source="scheduled_job",
activity_id=act_id,
scheduled_job_id=scheduled_job_id,
scheduled_run_id=scheduled_run_row_id,
scheduled_job_run_row_id=scheduled_run_row_id,
)
except Exception as exc:
try:
self.app.logger.warning(
"[Scheduler] Server-side Ansible queue failed job=%s run=%s host=%s err=%s",
scheduled_job_id,
scheduled_run_row_id,
hostname,
exc,
)
except Exception:
print(f"[Scheduler] Server-side Ansible queue failed job={scheduled_job_id} host={hostname} err={exc}")
if act_id:
try:
conn_fail = self._conn()
cur_fail = conn_fail.cursor()
cur_fail.execute(
"UPDATE activity_history SET status='Failed', stderr=?, ran_at=? WHERE id=?",
(str(exc), _now_ts(), act_id),
)
conn_fail.commit()
conn_fail.close()
except Exception:
pass
raise
else:
payload = {
"run_id": uuid.uuid4().hex,
"target_hostname": str(hostname),
"playbook_name": doc.get("name") or os.path.basename(abs_path),
"playbook_content": encoded_content,
"playbook_encoding": "base64",
"activity_job_id": act_id,
"scheduled_job_id": int(scheduled_job_id),
"scheduled_run_id": int(scheduled_run_row_id),
"connection": "winrm",
"variables": variables,
"files": files,
"variable_values": overrides_map,
}
try:
self.socketio.emit("ansible_playbook_run", payload)
except Exception:
pass
if act_id:
return {
"activity_id": int(act_id),
@@ -898,7 +954,7 @@ class JobScheduler:
pass
try:
cur.execute(
"SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC"
"SELECT id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, created_at FROM scheduled_jobs WHERE enabled=1 ORDER BY id ASC"
)
jobs = cur.fetchall()
except Exception:
@@ -916,7 +972,7 @@ class JobScheduler:
five_min = 300
now_min = _now_minute()
for (job_id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, created_at) in jobs:
for (job_id, components_json, targets_json, schedule_type, start_ts, expiration, execution_context, credential_id, created_at) in jobs:
try:
# Targets list for this job
try:
@@ -951,6 +1007,11 @@ class JobScheduler:
except Exception:
continue
run_mode = (execution_context or "system").strip().lower()
job_credential_id = None
try:
job_credential_id = int(credential_id) if credential_id is not None else None
except Exception:
job_credential_id = None
exp_seconds = _parse_expiration(expiration)
@@ -1037,54 +1098,78 @@ class JobScheduler:
run_row_id = c2.lastrowid or 0
conn2.commit()
activity_links: List[Dict[str, Any]] = []
# Dispatch all script components for this job to the target host
for comp in script_components:
try:
link = self._dispatch_script(host, comp, run_mode)
if link and link.get("activity_id"):
activity_links.append({
"run_id": run_row_id,
"activity_id": int(link["activity_id"]),
"component_kind": link.get("component_kind") or "script",
"script_type": link.get("script_type") or "powershell",
"component_path": link.get("component_path") or "",
"component_name": link.get("component_name") or "",
})
except Exception:
continue
# Dispatch ansible playbooks for this job to the target host
for comp in ansible_components:
try:
link = self._dispatch_ansible(host, comp, job_id, run_row_id)
if link and link.get("activity_id"):
activity_links.append({
"run_id": run_row_id,
"activity_id": int(link["activity_id"]),
"component_kind": link.get("component_kind") or "ansible",
"script_type": link.get("script_type") or "ansible",
"component_path": link.get("component_path") or "",
"component_name": link.get("component_name") or "",
})
except Exception:
continue
if activity_links:
try:
for link in activity_links:
c2.execute(
"INSERT OR IGNORE INTO scheduled_job_run_activity(run_id, activity_id, component_kind, script_type, component_path, component_name, created_at) VALUES (?,?,?,?,?,?,?)",
(
int(link["run_id"]),
int(link["activity_id"]),
link.get("component_kind") or "",
link.get("script_type") or "",
link.get("component_path") or "",
link.get("component_name") or "",
ts_now,
),
remote_requires_cred = run_mode in ("ssh", "winrm")
if remote_requires_cred and not job_credential_id:
err_msg = "Credential required for remote execution"
c2.execute(
"UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?",
(ts_now, ts_now, err_msg, run_row_id),
)
conn2.commit()
else:
# Dispatch all script components for this job to the target host
for comp in script_components:
try:
link = self._dispatch_script(host, comp, run_mode)
if link and link.get("activity_id"):
activity_links.append({
"run_id": run_row_id,
"activity_id": int(link["activity_id"]),
"component_kind": link.get("component_kind") or "script",
"script_type": link.get("script_type") or "powershell",
"component_path": link.get("component_path") or "",
"component_name": link.get("component_name") or "",
})
except Exception:
continue
# Dispatch ansible playbooks for this job to the target host
for comp in ansible_components:
try:
link = self._dispatch_ansible(
host,
comp,
job_id,
run_row_id,
run_mode,
job_credential_id,
)
conn2.commit()
except Exception:
pass
if link and link.get("activity_id"):
activity_links.append({
"run_id": run_row_id,
"activity_id": int(link["activity_id"]),
"component_kind": link.get("component_kind") or "ansible",
"script_type": link.get("script_type") or "ansible",
"component_path": link.get("component_path") or "",
"component_name": link.get("component_name") or "",
})
except Exception as exc:
try:
c2.execute(
"UPDATE scheduled_job_runs SET status='Failed', finished_ts=?, updated_at=?, error=? WHERE id=?",
(ts_now, ts_now, str(exc)[:512], run_row_id),
)
conn2.commit()
except Exception:
pass
continue
if activity_links:
try:
for link in activity_links:
c2.execute(
"INSERT OR IGNORE INTO scheduled_job_run_activity(run_id, activity_id, component_kind, script_type, component_path, component_name, created_at) VALUES (?,?,?,?,?,?,?)",
(
int(link["run_id"]),
int(link["activity_id"]),
link.get("component_kind") or "",
link.get("script_type") or "",
link.get("component_path") or "",
link.get("component_name") or "",
ts_now,
),
)
conn2.commit()
except Exception:
pass
except Exception:
pass
finally:
@@ -1157,9 +1242,10 @@ class JobScheduler:
"duration_stop_enabled": bool(r[6] or 0),
"expiration": r[7] or "no_expire",
"execution_context": r[8] or "system",
"enabled": bool(r[9] or 0),
"created_at": r[10] or 0,
"updated_at": r[11] or 0,
"credential_id": r[9],
"enabled": bool(r[10] or 0),
"created_at": r[11] or 0,
"updated_at": r[12] or 0,
}
# Attach computed status summary for latest occurrence
try:
@@ -1236,7 +1322,7 @@ class JobScheduler:
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at
FROM scheduled_jobs
ORDER BY created_at DESC
"""
@@ -1259,6 +1345,11 @@ class JobScheduler:
duration_stop_enabled = int(bool((data.get("duration") or {}).get("stopAfterEnabled") or data.get("duration_stop_enabled")))
expiration = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire"
execution_context = (data.get("execution_context") or "system").strip().lower()
credential_id = data.get("credential_id")
try:
credential_id = int(credential_id) if credential_id is not None else None
except Exception:
credential_id = None
enabled = int(bool(data.get("enabled", True)))
if not name or not components or not targets:
return json.dumps({"error": "name, components, targets required"}), 400, {"Content-Type": "application/json"}
@@ -1269,8 +1360,8 @@ class JobScheduler:
cur.execute(
"""
INSERT INTO scheduled_jobs
(name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
(name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
name,
@@ -1281,6 +1372,7 @@ class JobScheduler:
duration_stop_enabled,
expiration,
execution_context,
credential_id,
enabled,
now,
now,
@@ -1291,7 +1383,7 @@ class JobScheduler:
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
@@ -1310,7 +1402,7 @@ class JobScheduler:
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
@@ -1344,6 +1436,15 @@ class JobScheduler:
fields["expiration"] = (data.get("duration") or {}).get("expiration") or data.get("expiration") or "no_expire"
if "execution_context" in data:
fields["execution_context"] = (data.get("execution_context") or "system").strip().lower()
if "credential_id" in data:
cred_val = data.get("credential_id")
if cred_val in (None, "", "null"):
fields["credential_id"] = None
else:
try:
fields["credential_id"] = int(cred_val)
except Exception:
fields["credential_id"] = None
if "enabled" in data:
fields["enabled"] = int(bool(data.get("enabled")))
if not fields:
@@ -1361,7 +1462,7 @@ class JobScheduler:
cur.execute(
"""
SELECT id, name, components_json, targets_json, schedule_type, start_ts,
duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at
duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at
FROM scheduled_jobs WHERE id=?
""",
(job_id,),
@@ -1385,7 +1486,7 @@ class JobScheduler:
return json.dumps({"error": "not found"}), 404, {"Content-Type": "application/json"}
conn.commit()
cur.execute(
"SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=?",
"SELECT id, name, components_json, targets_json, schedule_type, start_ts, duration_stop_enabled, expiration, execution_context, credential_id, enabled, created_at, updated_at FROM scheduled_jobs WHERE id=?",
(job_id,),
)
row = cur.fetchone()
@@ -1633,3 +1734,7 @@ def register(app, socketio, db_path: str) -> JobScheduler:
def set_online_lookup(scheduler: JobScheduler, fn: Callable[[], List[str]]):
scheduler._online_lookup = fn
def set_server_ansible_runner(scheduler: JobScheduler, fn: Callable[..., str]):
scheduler._server_ansible_runner = fn