ENGINE: Successfully re-implemented Device Inventory

This commit is contained in:
2025-10-28 01:36:45 -06:00
parent 37fe7b6ec9
commit 8da3979e99
2 changed files with 414 additions and 1 deletions

View File

@@ -13,6 +13,7 @@ from typing import Any, Callable, Iterable, Mapping, Optional, Sequence
from flask import Blueprint, Flask, jsonify
from Modules.auth import jwt_service as jwt_service_module
from Modules.auth.device_auth import DeviceAuthManager
from Modules.auth.dpop import DPoPValidator
from Modules.auth.rate_limit import SlidingWindowRateLimiter
from Modules.crypto import signing
@@ -133,9 +134,11 @@ class LegacyServiceAdapters:
dpop_validator: DPoPValidator = field(init=False)
ip_rate_limiter: SlidingWindowRateLimiter = field(init=False)
fp_rate_limiter: SlidingWindowRateLimiter = field(init=False)
device_rate_limiter: SlidingWindowRateLimiter = field(init=False)
nonce_cache: NonceCache = field(init=False)
script_signer: Any = field(init=False)
service_log: Callable[[str, str, Optional[str]], None] = field(init=False)
device_auth_manager: DeviceAuthManager = field(init=False)
def __post_init__(self) -> None:
self.db_conn_factory = _make_db_conn_factory(self.context.database_path)
@@ -156,6 +159,15 @@ class LegacyServiceAdapters:
base = Path(self.context.database_path).resolve().parent
self.service_log = _make_service_logger(base, self.context.logger)
self.device_rate_limiter = SlidingWindowRateLimiter()
self.device_auth_manager = DeviceAuthManager(
db_conn_factory=self.db_conn_factory,
jwt_service=self.jwt_service,
dpop_validator=self.dpop_validator,
log=self.service_log,
rate_limiter=self.device_rate_limiter,
)
def _register_tokens(app: Flask, adapters: LegacyServiceAdapters) -> None:
token_routes.register(

View File

@@ -8,12 +8,15 @@ import os
import sqlite3
import threading
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from flask import Blueprint, jsonify, request, session
from flask import Blueprint, jsonify, request, session, g
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from Modules.auth.device_auth import require_device_auth
from Modules.guid_utils import normalize_guid
try:
@@ -93,6 +96,222 @@ def _row_to_site(row: Tuple[Any, ...]) -> Dict[str, Any]:
}
DEVICE_TABLE = "devices"
_DEVICE_JSON_LIST_FIELDS: Dict[str, Any] = {
"memory": [],
"network": [],
"software": [],
"storage": [],
}
_DEVICE_JSON_OBJECT_FIELDS: Dict[str, Any] = {"cpu": {}}
def _is_empty(value: Any) -> bool:
return value is None or value == "" or value == [] or value == {}
def _deep_merge_preserve(prev: Dict[str, Any], incoming: Dict[str, Any]) -> Dict[str, Any]:
out: Dict[str, Any] = dict(prev or {})
for key, value in (incoming or {}).items():
if isinstance(value, dict):
out[key] = _deep_merge_preserve(out.get(key) or {}, value)
elif isinstance(value, list):
if value:
out[key] = value
else:
if not _is_empty(value):
out[key] = value
return out
def _serialize_device_json(value: Any, default: Any) -> str:
candidate = value
if candidate is None:
candidate = default
if not isinstance(candidate, (list, dict)):
candidate = default
try:
return json.dumps(candidate)
except Exception:
try:
return json.dumps(default)
except Exception:
return "{}" if isinstance(default, dict) else "[]"
def _clean_device_str(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, (int, float)) and not isinstance(value, bool):
text = str(value)
elif isinstance(value, str):
text = value
else:
try:
text = str(value)
except Exception:
return None
text = text.strip()
return text or None
def _coerce_int(value: Any) -> Optional[int]:
if value is None:
return None
try:
if isinstance(value, str) and value.strip() == "":
return None
return int(float(value))
except (ValueError, TypeError):
return None
def _extract_device_columns(details: Dict[str, Any]) -> Dict[str, Any]:
summary = details.get("summary") or {}
payload: Dict[str, Any] = {}
for field, default in _DEVICE_JSON_LIST_FIELDS.items():
payload[field] = _serialize_device_json(details.get(field), default)
payload["cpu"] = _serialize_device_json(summary.get("cpu") or details.get("cpu"), _DEVICE_JSON_OBJECT_FIELDS["cpu"])
payload["device_type"] = _clean_device_str(summary.get("device_type") or summary.get("type"))
payload["domain"] = _clean_device_str(summary.get("domain"))
payload["external_ip"] = _clean_device_str(summary.get("external_ip") or summary.get("public_ip"))
payload["internal_ip"] = _clean_device_str(summary.get("internal_ip") or summary.get("private_ip"))
payload["last_reboot"] = _clean_device_str(summary.get("last_reboot") or summary.get("last_boot"))
payload["last_seen"] = _coerce_int(summary.get("last_seen"))
payload["last_user"] = _clean_device_str(
summary.get("last_user") or summary.get("last_user_name") or summary.get("username")
)
payload["operating_system"] = _clean_device_str(
summary.get("operating_system") or summary.get("agent_operating_system") or summary.get("os")
)
uptime_value = summary.get("uptime_sec") or summary.get("uptime_seconds") or summary.get("uptime")
payload["uptime"] = _coerce_int(uptime_value)
payload["agent_id"] = _clean_device_str(summary.get("agent_id"))
payload["ansible_ee_ver"] = _clean_device_str(summary.get("ansible_ee_ver"))
payload["connection_type"] = _clean_device_str(summary.get("connection_type") or summary.get("remote_type"))
payload["connection_endpoint"] = _clean_device_str(
summary.get("connection_endpoint")
or summary.get("connection_address")
or summary.get("address")
or summary.get("external_ip")
or summary.get("internal_ip")
)
return payload
def _device_upsert(
cur: sqlite3.Cursor,
hostname: str,
description: Optional[str],
merged_details: Dict[str, Any],
created_at: Optional[int],
*,
agent_hash: Optional[str] = None,
guid: Optional[str] = None,
) -> None:
if not hostname:
return
column_values = _extract_device_columns(merged_details or {})
normalized_description = description if description is not None else ""
try:
normalized_description = str(normalized_description)
except Exception:
normalized_description = ""
normalized_hash = _clean_device_str(agent_hash) or None
normalized_guid = _clean_device_str(guid) or None
if normalized_guid:
try:
normalized_guid = normalize_guid(normalized_guid)
except Exception:
pass
created_ts = _coerce_int(created_at)
if not created_ts:
created_ts = int(time.time())
sql = f"""
INSERT INTO {DEVICE_TABLE}(
hostname,
description,
created_at,
agent_hash,
guid,
memory,
network,
software,
storage,
cpu,
device_type,
domain,
external_ip,
internal_ip,
last_reboot,
last_seen,
last_user,
operating_system,
uptime,
agent_id,
ansible_ee_ver,
connection_type,
connection_endpoint
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(hostname) DO UPDATE SET
description=excluded.description,
created_at=COALESCE({DEVICE_TABLE}.created_at, excluded.created_at),
agent_hash=COALESCE(NULLIF(excluded.agent_hash, ''), {DEVICE_TABLE}.agent_hash),
guid=COALESCE(NULLIF(excluded.guid, ''), {DEVICE_TABLE}.guid),
memory=excluded.memory,
network=excluded.network,
software=excluded.software,
storage=excluded.storage,
cpu=excluded.cpu,
device_type=COALESCE(NULLIF(excluded.device_type, ''), {DEVICE_TABLE}.device_type),
domain=COALESCE(NULLIF(excluded.domain, ''), {DEVICE_TABLE}.domain),
external_ip=COALESCE(NULLIF(excluded.external_ip, ''), {DEVICE_TABLE}.external_ip),
internal_ip=COALESCE(NULLIF(excluded.internal_ip, ''), {DEVICE_TABLE}.internal_ip),
last_reboot=COALESCE(NULLIF(excluded.last_reboot, ''), {DEVICE_TABLE}.last_reboot),
last_seen=COALESCE(NULLIF(excluded.last_seen, 0), {DEVICE_TABLE}.last_seen),
last_user=COALESCE(NULLIF(excluded.last_user, ''), {DEVICE_TABLE}.last_user),
operating_system=COALESCE(NULLIF(excluded.operating_system, ''), {DEVICE_TABLE}.operating_system),
uptime=COALESCE(NULLIF(excluded.uptime, 0), {DEVICE_TABLE}.uptime),
agent_id=COALESCE(NULLIF(excluded.agent_id, ''), {DEVICE_TABLE}.agent_id),
ansible_ee_ver=COALESCE(NULLIF(excluded.ansible_ee_ver, ''), {DEVICE_TABLE}.ansible_ee_ver),
connection_type=COALESCE(NULLIF(excluded.connection_type, ''), {DEVICE_TABLE}.connection_type),
connection_endpoint=COALESCE(NULLIF(excluded.connection_endpoint, ''), {DEVICE_TABLE}.connection_endpoint)
"""
params: List[Any] = [
hostname,
normalized_description,
created_ts,
normalized_hash,
normalized_guid,
column_values.get("memory"),
column_values.get("network"),
column_values.get("software"),
column_values.get("storage"),
column_values.get("cpu"),
column_values.get("device_type"),
column_values.get("domain"),
column_values.get("external_ip"),
column_values.get("internal_ip"),
column_values.get("last_reboot"),
column_values.get("last_seen"),
column_values.get("last_user"),
column_values.get("operating_system"),
column_values.get("uptime"),
column_values.get("agent_id"),
column_values.get("ansible_ee_ver"),
column_values.get("connection_type"),
column_values.get("connection_endpoint"),
]
cur.execute(sql, params)
class RepositoryHashCache:
"""Lightweight GitHub head cache with on-disk persistence."""
@@ -511,6 +730,182 @@ class DeviceManagementService:
finally:
conn.close()
def save_agent_details(self) -> Tuple[Dict[str, Any], int]:
ctx = getattr(g, "device_auth", None)
if ctx is None:
self.service_log("server", "/api/agent/details missing device auth context", level="ERROR")
return {"error": "auth_context_missing"}, 500
payload = request.get_json(silent=True) or {}
details = payload.get("details")
if not isinstance(details, dict):
return {"error": "invalid payload"}, 400
hostname = _clean_device_str(payload.get("hostname"))
if not hostname:
summary_host = (details.get("summary") or {}).get("hostname")
hostname = _clean_device_str(summary_host)
if not hostname:
return {"error": "invalid payload"}, 400
agent_id = _clean_device_str(payload.get("agent_id"))
agent_hash = _clean_device_str(payload.get("agent_hash"))
raw_guid = getattr(ctx, "guid", None)
try:
auth_guid = normalize_guid(raw_guid) if raw_guid else None
except Exception:
auth_guid = None
fingerprint = _clean_device_str(getattr(ctx, "ssl_key_fingerprint", None))
fingerprint_lower = fingerprint.lower() if fingerprint else ""
scope_hint = getattr(ctx, "service_mode", None)
conn = self._db_conn()
try:
cur = conn.cursor()
columns_sql = ", ".join(f"d.{col}" for col in self._DEVICE_COLUMNS)
cur.execute(
f"SELECT {columns_sql}, d.ssl_key_fingerprint FROM {DEVICE_TABLE} AS d WHERE d.hostname = ?",
(hostname,),
)
row = cur.fetchone()
prev_details: Dict[str, Any] = {}
description = ""
created_at = 0
existing_guid = None
existing_agent_hash = None
db_fp = ""
if row:
device_tuple = row[: len(self._DEVICE_COLUMNS)]
previous = self._build_device_payload(device_tuple, (None, None, None))
try:
prev_details = json.loads(json.dumps(previous.get("details", {})))
except Exception:
prev_details = previous.get("details", {}) or {}
description = previous.get("description") or ""
created_at = _coerce_int(previous.get("created_at")) or 0
existing_guid_raw = previous.get("agent_guid") or ""
try:
existing_guid = normalize_guid(existing_guid_raw) if existing_guid_raw else None
except Exception:
existing_guid = None
existing_agent_hash = _clean_device_str(previous.get("agent_hash")) or None
db_fp = (row[-1] or "").strip().lower() if row[-1] else ""
if db_fp and fingerprint_lower and db_fp != fingerprint_lower:
self.service_log(
"server",
f"/api/agent/details fingerprint mismatch host={hostname} guid={auth_guid or existing_guid or ''}",
scope_hint,
level="WARN",
)
return {"error": "fingerprint_mismatch"}, 403
if existing_guid and auth_guid and existing_guid != auth_guid:
self.service_log(
"server",
f"/api/agent/details guid mismatch host={hostname} expected={existing_guid} provided={auth_guid}",
scope_hint,
level="WARN",
)
return {"error": "guid_mismatch"}, 403
incoming_summary = details.setdefault("summary", {})
if agent_id and not incoming_summary.get("agent_id"):
incoming_summary["agent_id"] = agent_id
if hostname and not incoming_summary.get("hostname"):
incoming_summary["hostname"] = hostname
if agent_hash:
incoming_summary["agent_hash"] = agent_hash
effective_guid = auth_guid or existing_guid
if effective_guid:
incoming_summary["agent_guid"] = effective_guid
if fingerprint:
incoming_summary.setdefault("ssl_key_fingerprint", fingerprint)
prev_summary = prev_details.get("summary") if isinstance(prev_details, dict) else {}
if isinstance(prev_summary, dict):
if _is_empty(incoming_summary.get("last_seen")) and not _is_empty(prev_summary.get("last_seen")):
try:
incoming_summary["last_seen"] = int(prev_summary.get("last_seen"))
except Exception:
pass
if _is_empty(incoming_summary.get("last_user")) and not _is_empty(prev_summary.get("last_user")):
incoming_summary["last_user"] = prev_summary.get("last_user")
merged = _deep_merge_preserve(prev_details, details)
merged_summary = merged.setdefault("summary", {})
if hostname:
merged_summary.setdefault("hostname", hostname)
if agent_id:
merged_summary.setdefault("agent_id", agent_id)
if agent_hash and _is_empty(merged_summary.get("agent_hash")):
merged_summary["agent_hash"] = agent_hash
if effective_guid:
merged_summary["agent_guid"] = effective_guid
if fingerprint:
merged_summary.setdefault("ssl_key_fingerprint", fingerprint)
if description and _is_empty(merged_summary.get("description")):
merged_summary["description"] = description
if existing_agent_hash and _is_empty(merged_summary.get("agent_hash")):
merged_summary["agent_hash"] = existing_agent_hash
if created_at <= 0:
created_at = int(time.time())
try:
merged_summary.setdefault(
"created",
datetime.fromtimestamp(created_at, timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
except Exception:
pass
merged_summary.setdefault("created_at", created_at)
_device_upsert(
cur,
hostname,
description,
merged,
created_at,
agent_hash=agent_hash or existing_agent_hash,
guid=effective_guid,
)
if effective_guid and fingerprint:
now_iso = datetime.now(timezone.utc).isoformat()
cur.execute(
"""
UPDATE devices
SET ssl_key_fingerprint = ?,
key_added_at = COALESCE(key_added_at, ?)
WHERE guid = ?
""",
(fingerprint, now_iso, effective_guid),
)
cur.execute(
"""
INSERT OR IGNORE INTO device_keys (id, guid, ssl_key_fingerprint, added_at)
VALUES (?, ?, ?, ?)
""",
(str(uuid.uuid4()), effective_guid, fingerprint, now_iso),
)
conn.commit()
return {"status": "ok"}, 200
except Exception as exc:
try:
conn.rollback()
except Exception:
pass
self.logger.debug("Failed to save agent details", exc_info=True)
self.service_log("server", f"/api/agent/details error: {exc}", scope_hint, level="ERROR")
return {"error": "internal error"}, 500
finally:
conn.close()
def get_device_details(self, hostname: str) -> Tuple[Dict[str, Any], int]:
conn = self._db_conn()
try:
@@ -1028,6 +1423,12 @@ def register_management(app, adapters: "LegacyServiceAdapters") -> None:
service = DeviceManagementService(app, adapters)
blueprint = Blueprint("devices", __name__)
@blueprint.route("/api/agent/details", methods=["POST"])
@require_device_auth(adapters.device_auth_manager)
def _agent_details():
payload, status = service.save_agent_details()
return jsonify(payload), status
@blueprint.route("/api/devices", methods=["GET"])
def _list_devices():
payload, status = service.list_devices()