Additional Changes to Ansible Logic

This commit is contained in:
2025-10-11 18:58:55 -06:00
parent c0f47075d6
commit 8cae44539c
9 changed files with 596 additions and 146 deletions

View File

@@ -19,8 +19,10 @@ Section Guide:
"""
import eventlet
# Monkey-patch stdlib for cooperative sockets
eventlet.monkey_patch()
# Monkey-patch stdlib for cooperative sockets (keep real threads for tpool usage)
eventlet.monkey_patch(thread=False)
from eventlet import tpool
import requests
import re
@@ -43,6 +45,7 @@ import subprocess
import stat
import traceback
from threading import Lock
from datetime import datetime, timezone
try:
@@ -2045,7 +2048,7 @@ def _queue_server_ansible_run(
"started_ts": _now_ts(),
}
try:
socketio.start_background_task(_execute_server_ansible_run, ctx)
socketio.start_background_task(_execute_server_ansible_run, ctx, None)
except Exception as exc:
_ansible_log_server(f"[server_run] failed to queue background task run_id={run_id}: {exc}")
_execute_server_ansible_run(ctx, immediate_error=str(exc))
@@ -2147,7 +2150,8 @@ def _execute_server_ansible_run(ctx: Dict[str, Any], immediate_error: Optional[s
f"[server_run] start run_id={run_id} host='{hostname}' playbook='{playbook_rel_path}' credential={credential_id}"
)
proc = subprocess.run(
proc = tpool.execute(
subprocess.run,
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@@ -4967,6 +4971,7 @@ def _fetch_devices(
*,
connection_type: Optional[str] = None,
hostname: Optional[str] = None,
only_agents: bool = False,
) -> List[Dict[str, Any]]:
try:
conn = _db_conn()
@@ -4986,6 +4991,8 @@ def _fetch_devices(
if hostname:
clauses.append("LOWER(d.hostname) = LOWER(?)")
params.append(hostname.lower())
if only_agents:
clauses.append("(d.connection_type IS NULL OR TRIM(d.connection_type) = '')")
if clauses:
sql += " WHERE " + " AND ".join(clauses)
cur.execute(sql, params)
@@ -5060,21 +5067,105 @@ def list_devices():
return jsonify({"devices": devices})
@app.route("/api/ssh_devices", methods=["GET", "POST"])
def api_ssh_devices():
def _upsert_remote_device(
connection_type: str,
hostname: str,
address: Optional[str],
description: Optional[str],
os_hint: Optional[str],
*,
ensure_existing_type: Optional[str],
) -> Dict[str, Any]:
conn = _db_conn()
cur = conn.cursor()
existing = _load_device_snapshot(cur, hostname=hostname)
existing_type = (existing or {}).get("summary", {}).get("connection_type") or ""
existing_type = existing_type.strip().lower()
if ensure_existing_type and existing_type != ensure_existing_type.lower():
conn.close()
raise ValueError("device not found")
if ensure_existing_type is None and existing_type and existing_type != connection_type.lower():
conn.close()
raise ValueError("device already exists with different connection type")
created_ts = existing.get("summary", {}).get("created_at") if existing else None
if not created_ts:
created_ts = _now_ts()
endpoint = address or (existing.get("summary", {}).get("connection_endpoint") if existing else "")
if not endpoint:
conn.close()
raise ValueError("address is required")
description_val = description
if description_val is None:
description_val = existing.get("summary", {}).get("description") if existing else ""
os_value = os_hint or (existing.get("summary", {}).get("operating_system") if existing else "")
os_value = os_value or ""
device_type_label = "SSH Remote" if connection_type.lower() == "ssh" else "WinRM Remote"
summary_payload = {
"connection_type": connection_type.lower(),
"connection_endpoint": endpoint,
"internal_ip": endpoint,
"external_ip": endpoint,
"device_type": device_type_label,
"operating_system": os_value,
"last_seen": 0,
}
_device_upsert(
cur,
hostname,
description_val,
{"summary": summary_payload},
created_ts,
)
conn.commit()
conn.close()
devices = _fetch_devices(hostname=hostname)
if not devices:
raise RuntimeError("failed to load device after upsert")
return devices[0]
def _delete_remote_device(connection_type: str, hostname: str) -> None:
conn = _db_conn()
cur = conn.cursor()
existing = _load_device_snapshot(cur, hostname=hostname)
existing_type = (existing or {}).get("summary", {}).get("connection_type") or ""
if (existing_type or "").strip().lower() != connection_type.lower():
conn.close()
raise ValueError("device not found")
cur.execute("DELETE FROM device_sites WHERE device_hostname = ?", (hostname,))
cur.execute(f"DELETE FROM {DEVICE_TABLE} WHERE hostname = ?", (hostname,))
conn.commit()
conn.close()
def _remote_devices_collection(connection_type: str):
chk = _require_admin()
if chk:
return chk
if request.method == "GET":
try:
devices = _fetch_devices(connection_type="ssh")
devices = _fetch_devices(connection_type=connection_type)
return jsonify({"devices": devices})
except RuntimeError as exc:
return jsonify({"error": str(exc)}), 500
data = request.get_json(silent=True) or {}
hostname = _clean_device_str(data.get("hostname")) or ""
address = _clean_device_str(data.get("address") or data.get("connection_endpoint") or data.get("endpoint")) or ""
address = _clean_device_str(
data.get("address")
or data.get("connection_endpoint")
or data.get("endpoint")
or data.get("host")
) or ""
description = _clean_device_str(data.get("description")) or ""
os_hint = _clean_device_str(data.get("operating_system") or data.get("os")) or ""
if not hostname:
@@ -5082,50 +5173,23 @@ def api_ssh_devices():
if not address:
return jsonify({"error": "address is required"}), 400
now_ts = _now_ts()
conn = None
try:
conn = _db_conn()
cur = conn.cursor()
existing = _load_device_snapshot(cur, hostname=hostname)
if existing and (existing.get("summary", {}).get("connection_type") or "").lower() not in ("", "ssh"):
conn.close()
return jsonify({"error": "Device already exists and is managed by an agent"}), 409
summary_payload = {
"connection_type": "ssh",
"connection_endpoint": address,
"internal_ip": address,
"external_ip": address,
"device_type": "SSH Remote",
"operating_system": os_hint or (existing.get("summary", {}).get("operating_system") if existing else ""),
"last_seen": 0,
}
_device_upsert(
cur,
device = _upsert_remote_device(
connection_type,
hostname,
address,
description,
{"summary": summary_payload},
now_ts,
os_hint,
ensure_existing_type=None,
)
conn.commit()
conn.close()
except ValueError as exc:
return jsonify({"error": str(exc)}), 409
except Exception as exc:
if conn:
conn.close()
return jsonify({"error": str(exc)}), 500
try:
devices = _fetch_devices(hostname=hostname)
except RuntimeError as exc:
return jsonify({"error": str(exc)}), 500
device = devices[0] if devices else None
return jsonify({"device": device}), 201
@app.route("/api/ssh_devices/<hostname>", methods=["PUT", "DELETE"])
def api_ssh_device_detail(hostname: str):
def _remote_device_detail(connection_type: str, hostname: str):
chk = _require_admin()
if chk:
return chk
@@ -5133,71 +5197,71 @@ def api_ssh_device_detail(hostname: str):
if not normalized_host:
return jsonify({"error": "invalid hostname"}), 400
conn = None
if request.method == "DELETE":
try:
_delete_remote_device(connection_type, normalized_host)
except ValueError as exc:
return jsonify({"error": str(exc)}), 404
except Exception as exc:
return jsonify({"error": str(exc)}), 500
return jsonify({"status": "ok"})
data = request.get_json(silent=True) or {}
address = _clean_device_str(
data.get("address")
or data.get("connection_endpoint")
or data.get("endpoint")
)
description = data.get("description")
os_hint = _clean_device_str(data.get("operating_system") or data.get("os"))
if address is None and description is None and os_hint is None:
return jsonify({"error": "no fields to update"}), 400
try:
conn = _db_conn()
cur = conn.cursor()
existing = _load_device_snapshot(cur, hostname=normalized_host)
if not existing:
conn.close()
return jsonify({"error": "device not found"}), 404
existing_type = (existing.get("summary", {}).get("connection_type") or "").lower()
if existing_type != "ssh":
conn.close()
return jsonify({"error": "device is not managed as SSH"}), 400
if request.method == "DELETE":
cur.execute("DELETE FROM device_sites WHERE device_hostname = ?", (normalized_host,))
cur.execute(f"DELETE FROM {DEVICE_TABLE} WHERE hostname = ?", (normalized_host,))
conn.commit()
conn.close()
return jsonify({"status": "ok"})
data = request.get_json(silent=True) or {}
new_address = _clean_device_str(data.get("address") or data.get("connection_endpoint") or data.get("endpoint"))
new_description = data.get("description")
new_os = _clean_device_str(data.get("operating_system") or data.get("os"))
summary = existing.get("summary", {})
description_value = summary.get("description") or existing.get("description") or ""
if new_description is not None:
try:
description_value = str(new_description).strip()
except Exception:
description_value = summary.get("description") or ""
endpoint_value = new_address or summary.get("connection_endpoint") or ""
os_value = new_os or summary.get("operating_system") or ""
summary_payload = {
"connection_type": "ssh",
"connection_endpoint": endpoint_value,
"internal_ip": endpoint_value or summary.get("internal_ip") or "",
"external_ip": endpoint_value or summary.get("external_ip") or "",
"device_type": summary.get("device_type") or "SSH Remote",
"operating_system": os_value,
"last_seen": 0,
}
created_ts = summary.get("created_at") or existing.get("created_at") or _now_ts()
_device_upsert(
cur,
device = _upsert_remote_device(
connection_type,
normalized_host,
description_value,
{"summary": summary_payload},
created_ts,
address if address is not None else "",
description,
os_hint,
ensure_existing_type=connection_type,
)
conn.commit()
conn.close()
except ValueError as exc:
return jsonify({"error": str(exc)}), 404
except Exception as exc:
if conn:
conn.close()
return jsonify({"error": str(exc)}), 500
return jsonify({"device": device})
@app.route("/api/ssh_devices", methods=["GET", "POST"])
def api_ssh_devices():
return _remote_devices_collection("ssh")
@app.route("/api/ssh_devices/<hostname>", methods=["PUT", "DELETE"])
def api_ssh_device_detail(hostname: str):
return _remote_device_detail("ssh", hostname)
@app.route("/api/winrm_devices", methods=["GET", "POST"])
def api_winrm_devices():
return _remote_devices_collection("winrm")
@app.route("/api/winrm_devices/<hostname>", methods=["PUT", "DELETE"])
def api_winrm_device_detail(hostname: str):
return _remote_device_detail("winrm", hostname)
@app.route("/api/agent_devices", methods=["GET"])
def api_agent_devices():
chk = _require_admin()
if chk:
return chk
try:
devices = _fetch_devices(hostname=normalized_host)
devices = _fetch_devices(only_agents=True)
return jsonify({"devices": devices})
except RuntimeError as exc:
return jsonify({"error": str(exc)}), 500
device = devices[0] if devices else None
return jsonify({"device": device})
# Endpoint: /api/devices/<guid> — methods GET.
@@ -5483,6 +5547,25 @@ def _persist_last_seen(hostname: str, last_seen: int, agent_id: str = None):
print(f"[WARN] Failed to persist last_seen for {hostname}: {e}")
def _normalize_guid(value: Optional[str]) -> str:
candidate = (value or "").strip()
if not candidate:
return ""
candidate = candidate.replace("{", "").replace("}", "")
try:
upper = candidate.upper()
if upper.count("-") == 4 and len(upper) == 36:
return upper
if len(candidate) == 32 and all(c in "0123456789abcdefABCDEF" for c in candidate):
grouped = "-".join(
[candidate[0:8], candidate[8:12], candidate[12:16], candidate[16:20], candidate[20:32]]
)
return grouped.upper()
except Exception:
pass
return candidate.upper()
def load_agents_from_db():
"""Populate registered_agents with any devices stored in the database."""
try:
@@ -5587,16 +5670,6 @@ def _device_rows_for_agent(cur, agent_id: str) -> List[Dict[str, Any]]:
return results
def _normalize_guid(value: Optional[str]) -> str:
candidate = (value or "").strip()
if not candidate:
return ""
try:
return str(uuid.UUID(candidate))
except Exception:
return candidate
def _ensure_agent_guid_for_hostname(cur, hostname: str, agent_id: Optional[str] = None) -> Optional[str]:
normalized_host = (hostname or "").strip()
if not normalized_host: