diff --git a/Data/Engine/services/API/__init__.py b/Data/Engine/services/API/__init__.py index bf520bbd..de482eee 100644 --- a/Data/Engine/services/API/__init__.py +++ b/Data/Engine/services/API/__init__.py @@ -13,6 +13,7 @@ from typing import Any, Callable, Iterable, Mapping, Optional, Sequence from flask import Blueprint, Flask, jsonify from Modules.auth import jwt_service as jwt_service_module +from Modules.auth.device_auth import DeviceAuthManager from Modules.auth.dpop import DPoPValidator from Modules.auth.rate_limit import SlidingWindowRateLimiter from Modules.crypto import signing @@ -133,9 +134,11 @@ class LegacyServiceAdapters: dpop_validator: DPoPValidator = field(init=False) ip_rate_limiter: SlidingWindowRateLimiter = field(init=False) fp_rate_limiter: SlidingWindowRateLimiter = field(init=False) + device_rate_limiter: SlidingWindowRateLimiter = field(init=False) nonce_cache: NonceCache = field(init=False) script_signer: Any = field(init=False) service_log: Callable[[str, str, Optional[str]], None] = field(init=False) + device_auth_manager: DeviceAuthManager = field(init=False) def __post_init__(self) -> None: self.db_conn_factory = _make_db_conn_factory(self.context.database_path) @@ -156,6 +159,15 @@ class LegacyServiceAdapters: base = Path(self.context.database_path).resolve().parent self.service_log = _make_service_logger(base, self.context.logger) + self.device_rate_limiter = SlidingWindowRateLimiter() + self.device_auth_manager = DeviceAuthManager( + db_conn_factory=self.db_conn_factory, + jwt_service=self.jwt_service, + dpop_validator=self.dpop_validator, + log=self.service_log, + rate_limiter=self.device_rate_limiter, + ) + def _register_tokens(app: Flask, adapters: LegacyServiceAdapters) -> None: token_routes.register( diff --git a/Data/Engine/services/API/devices/management.py b/Data/Engine/services/API/devices/management.py index 38b14fd3..3acedc59 100644 --- a/Data/Engine/services/API/devices/management.py +++ b/Data/Engine/services/API/devices/management.py @@ -8,12 +8,15 @@ import os import sqlite3 import threading import time +import uuid +from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple -from flask import Blueprint, jsonify, request, session +from flask import Blueprint, jsonify, request, session, g from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer +from Modules.auth.device_auth import require_device_auth from Modules.guid_utils import normalize_guid try: @@ -93,6 +96,222 @@ def _row_to_site(row: Tuple[Any, ...]) -> Dict[str, Any]: } +DEVICE_TABLE = "devices" +_DEVICE_JSON_LIST_FIELDS: Dict[str, Any] = { + "memory": [], + "network": [], + "software": [], + "storage": [], +} +_DEVICE_JSON_OBJECT_FIELDS: Dict[str, Any] = {"cpu": {}} + + +def _is_empty(value: Any) -> bool: + return value is None or value == "" or value == [] or value == {} + + +def _deep_merge_preserve(prev: Dict[str, Any], incoming: Dict[str, Any]) -> Dict[str, Any]: + out: Dict[str, Any] = dict(prev or {}) + for key, value in (incoming or {}).items(): + if isinstance(value, dict): + out[key] = _deep_merge_preserve(out.get(key) or {}, value) + elif isinstance(value, list): + if value: + out[key] = value + else: + if not _is_empty(value): + out[key] = value + return out + + +def _serialize_device_json(value: Any, default: Any) -> str: + candidate = value + if candidate is None: + candidate = default + if not isinstance(candidate, (list, dict)): + candidate = default + try: + return json.dumps(candidate) + except Exception: + try: + return json.dumps(default) + except Exception: + return "{}" if isinstance(default, dict) else "[]" + + +def _clean_device_str(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, (int, float)) and not isinstance(value, bool): + text = str(value) + elif isinstance(value, str): + text = value + else: + try: + text = str(value) + except Exception: + return None + text = text.strip() + return text or None + + +def _coerce_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + if isinstance(value, str) and value.strip() == "": + return None + return int(float(value)) + except (ValueError, TypeError): + return None + + +def _extract_device_columns(details: Dict[str, Any]) -> Dict[str, Any]: + summary = details.get("summary") or {} + payload: Dict[str, Any] = {} + + for field, default in _DEVICE_JSON_LIST_FIELDS.items(): + payload[field] = _serialize_device_json(details.get(field), default) + payload["cpu"] = _serialize_device_json(summary.get("cpu") or details.get("cpu"), _DEVICE_JSON_OBJECT_FIELDS["cpu"]) + + payload["device_type"] = _clean_device_str(summary.get("device_type") or summary.get("type")) + payload["domain"] = _clean_device_str(summary.get("domain")) + payload["external_ip"] = _clean_device_str(summary.get("external_ip") or summary.get("public_ip")) + payload["internal_ip"] = _clean_device_str(summary.get("internal_ip") or summary.get("private_ip")) + payload["last_reboot"] = _clean_device_str(summary.get("last_reboot") or summary.get("last_boot")) + payload["last_seen"] = _coerce_int(summary.get("last_seen")) + payload["last_user"] = _clean_device_str( + summary.get("last_user") or summary.get("last_user_name") or summary.get("username") + ) + payload["operating_system"] = _clean_device_str( + summary.get("operating_system") or summary.get("agent_operating_system") or summary.get("os") + ) + uptime_value = summary.get("uptime_sec") or summary.get("uptime_seconds") or summary.get("uptime") + payload["uptime"] = _coerce_int(uptime_value) + payload["agent_id"] = _clean_device_str(summary.get("agent_id")) + payload["ansible_ee_ver"] = _clean_device_str(summary.get("ansible_ee_ver")) + payload["connection_type"] = _clean_device_str(summary.get("connection_type") or summary.get("remote_type")) + payload["connection_endpoint"] = _clean_device_str( + summary.get("connection_endpoint") + or summary.get("connection_address") + or summary.get("address") + or summary.get("external_ip") + or summary.get("internal_ip") + ) + return payload + + +def _device_upsert( + cur: sqlite3.Cursor, + hostname: str, + description: Optional[str], + merged_details: Dict[str, Any], + created_at: Optional[int], + *, + agent_hash: Optional[str] = None, + guid: Optional[str] = None, +) -> None: + if not hostname: + return + column_values = _extract_device_columns(merged_details or {}) + + normalized_description = description if description is not None else "" + try: + normalized_description = str(normalized_description) + except Exception: + normalized_description = "" + + normalized_hash = _clean_device_str(agent_hash) or None + normalized_guid = _clean_device_str(guid) or None + if normalized_guid: + try: + normalized_guid = normalize_guid(normalized_guid) + except Exception: + pass + + created_ts = _coerce_int(created_at) + if not created_ts: + created_ts = int(time.time()) + + sql = f""" + INSERT INTO {DEVICE_TABLE}( + hostname, + description, + created_at, + agent_hash, + guid, + memory, + network, + software, + storage, + cpu, + device_type, + domain, + external_ip, + internal_ip, + last_reboot, + last_seen, + last_user, + operating_system, + uptime, + agent_id, + ansible_ee_ver, + connection_type, + connection_endpoint + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + ON CONFLICT(hostname) DO UPDATE SET + description=excluded.description, + created_at=COALESCE({DEVICE_TABLE}.created_at, excluded.created_at), + agent_hash=COALESCE(NULLIF(excluded.agent_hash, ''), {DEVICE_TABLE}.agent_hash), + guid=COALESCE(NULLIF(excluded.guid, ''), {DEVICE_TABLE}.guid), + memory=excluded.memory, + network=excluded.network, + software=excluded.software, + storage=excluded.storage, + cpu=excluded.cpu, + device_type=COALESCE(NULLIF(excluded.device_type, ''), {DEVICE_TABLE}.device_type), + domain=COALESCE(NULLIF(excluded.domain, ''), {DEVICE_TABLE}.domain), + external_ip=COALESCE(NULLIF(excluded.external_ip, ''), {DEVICE_TABLE}.external_ip), + internal_ip=COALESCE(NULLIF(excluded.internal_ip, ''), {DEVICE_TABLE}.internal_ip), + last_reboot=COALESCE(NULLIF(excluded.last_reboot, ''), {DEVICE_TABLE}.last_reboot), + last_seen=COALESCE(NULLIF(excluded.last_seen, 0), {DEVICE_TABLE}.last_seen), + last_user=COALESCE(NULLIF(excluded.last_user, ''), {DEVICE_TABLE}.last_user), + operating_system=COALESCE(NULLIF(excluded.operating_system, ''), {DEVICE_TABLE}.operating_system), + uptime=COALESCE(NULLIF(excluded.uptime, 0), {DEVICE_TABLE}.uptime), + agent_id=COALESCE(NULLIF(excluded.agent_id, ''), {DEVICE_TABLE}.agent_id), + ansible_ee_ver=COALESCE(NULLIF(excluded.ansible_ee_ver, ''), {DEVICE_TABLE}.ansible_ee_ver), + connection_type=COALESCE(NULLIF(excluded.connection_type, ''), {DEVICE_TABLE}.connection_type), + connection_endpoint=COALESCE(NULLIF(excluded.connection_endpoint, ''), {DEVICE_TABLE}.connection_endpoint) + """ + + params: List[Any] = [ + hostname, + normalized_description, + created_ts, + normalized_hash, + normalized_guid, + column_values.get("memory"), + column_values.get("network"), + column_values.get("software"), + column_values.get("storage"), + column_values.get("cpu"), + column_values.get("device_type"), + column_values.get("domain"), + column_values.get("external_ip"), + column_values.get("internal_ip"), + column_values.get("last_reboot"), + column_values.get("last_seen"), + column_values.get("last_user"), + column_values.get("operating_system"), + column_values.get("uptime"), + column_values.get("agent_id"), + column_values.get("ansible_ee_ver"), + column_values.get("connection_type"), + column_values.get("connection_endpoint"), + ] + cur.execute(sql, params) + + class RepositoryHashCache: """Lightweight GitHub head cache with on-disk persistence.""" @@ -511,6 +730,182 @@ class DeviceManagementService: finally: conn.close() + def save_agent_details(self) -> Tuple[Dict[str, Any], int]: + ctx = getattr(g, "device_auth", None) + if ctx is None: + self.service_log("server", "/api/agent/details missing device auth context", level="ERROR") + return {"error": "auth_context_missing"}, 500 + + payload = request.get_json(silent=True) or {} + details = payload.get("details") + if not isinstance(details, dict): + return {"error": "invalid payload"}, 400 + + hostname = _clean_device_str(payload.get("hostname")) + if not hostname: + summary_host = (details.get("summary") or {}).get("hostname") + hostname = _clean_device_str(summary_host) + if not hostname: + return {"error": "invalid payload"}, 400 + + agent_id = _clean_device_str(payload.get("agent_id")) + agent_hash = _clean_device_str(payload.get("agent_hash")) + + raw_guid = getattr(ctx, "guid", None) + try: + auth_guid = normalize_guid(raw_guid) if raw_guid else None + except Exception: + auth_guid = None + + fingerprint = _clean_device_str(getattr(ctx, "ssl_key_fingerprint", None)) + fingerprint_lower = fingerprint.lower() if fingerprint else "" + scope_hint = getattr(ctx, "service_mode", None) + + conn = self._db_conn() + try: + cur = conn.cursor() + columns_sql = ", ".join(f"d.{col}" for col in self._DEVICE_COLUMNS) + cur.execute( + f"SELECT {columns_sql}, d.ssl_key_fingerprint FROM {DEVICE_TABLE} AS d WHERE d.hostname = ?", + (hostname,), + ) + row = cur.fetchone() + + prev_details: Dict[str, Any] = {} + description = "" + created_at = 0 + existing_guid = None + existing_agent_hash = None + db_fp = "" + + if row: + device_tuple = row[: len(self._DEVICE_COLUMNS)] + previous = self._build_device_payload(device_tuple, (None, None, None)) + try: + prev_details = json.loads(json.dumps(previous.get("details", {}))) + except Exception: + prev_details = previous.get("details", {}) or {} + description = previous.get("description") or "" + created_at = _coerce_int(previous.get("created_at")) or 0 + existing_guid_raw = previous.get("agent_guid") or "" + try: + existing_guid = normalize_guid(existing_guid_raw) if existing_guid_raw else None + except Exception: + existing_guid = None + existing_agent_hash = _clean_device_str(previous.get("agent_hash")) or None + db_fp = (row[-1] or "").strip().lower() if row[-1] else "" + if db_fp and fingerprint_lower and db_fp != fingerprint_lower: + self.service_log( + "server", + f"/api/agent/details fingerprint mismatch host={hostname} guid={auth_guid or existing_guid or ''}", + scope_hint, + level="WARN", + ) + return {"error": "fingerprint_mismatch"}, 403 + + if existing_guid and auth_guid and existing_guid != auth_guid: + self.service_log( + "server", + f"/api/agent/details guid mismatch host={hostname} expected={existing_guid} provided={auth_guid}", + scope_hint, + level="WARN", + ) + return {"error": "guid_mismatch"}, 403 + + incoming_summary = details.setdefault("summary", {}) + if agent_id and not incoming_summary.get("agent_id"): + incoming_summary["agent_id"] = agent_id + if hostname and not incoming_summary.get("hostname"): + incoming_summary["hostname"] = hostname + if agent_hash: + incoming_summary["agent_hash"] = agent_hash + + effective_guid = auth_guid or existing_guid + if effective_guid: + incoming_summary["agent_guid"] = effective_guid + if fingerprint: + incoming_summary.setdefault("ssl_key_fingerprint", fingerprint) + + prev_summary = prev_details.get("summary") if isinstance(prev_details, dict) else {} + if isinstance(prev_summary, dict): + if _is_empty(incoming_summary.get("last_seen")) and not _is_empty(prev_summary.get("last_seen")): + try: + incoming_summary["last_seen"] = int(prev_summary.get("last_seen")) + except Exception: + pass + if _is_empty(incoming_summary.get("last_user")) and not _is_empty(prev_summary.get("last_user")): + incoming_summary["last_user"] = prev_summary.get("last_user") + + merged = _deep_merge_preserve(prev_details, details) + merged_summary = merged.setdefault("summary", {}) + if hostname: + merged_summary.setdefault("hostname", hostname) + if agent_id: + merged_summary.setdefault("agent_id", agent_id) + if agent_hash and _is_empty(merged_summary.get("agent_hash")): + merged_summary["agent_hash"] = agent_hash + if effective_guid: + merged_summary["agent_guid"] = effective_guid + if fingerprint: + merged_summary.setdefault("ssl_key_fingerprint", fingerprint) + if description and _is_empty(merged_summary.get("description")): + merged_summary["description"] = description + if existing_agent_hash and _is_empty(merged_summary.get("agent_hash")): + merged_summary["agent_hash"] = existing_agent_hash + + if created_at <= 0: + created_at = int(time.time()) + try: + merged_summary.setdefault( + "created", + datetime.fromtimestamp(created_at, timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + ) + except Exception: + pass + merged_summary.setdefault("created_at", created_at) + + _device_upsert( + cur, + hostname, + description, + merged, + created_at, + agent_hash=agent_hash or existing_agent_hash, + guid=effective_guid, + ) + + if effective_guid and fingerprint: + now_iso = datetime.now(timezone.utc).isoformat() + cur.execute( + """ + UPDATE devices + SET ssl_key_fingerprint = ?, + key_added_at = COALESCE(key_added_at, ?) + WHERE guid = ? + """, + (fingerprint, now_iso, effective_guid), + ) + cur.execute( + """ + INSERT OR IGNORE INTO device_keys (id, guid, ssl_key_fingerprint, added_at) + VALUES (?, ?, ?, ?) + """, + (str(uuid.uuid4()), effective_guid, fingerprint, now_iso), + ) + + conn.commit() + return {"status": "ok"}, 200 + except Exception as exc: + try: + conn.rollback() + except Exception: + pass + self.logger.debug("Failed to save agent details", exc_info=True) + self.service_log("server", f"/api/agent/details error: {exc}", scope_hint, level="ERROR") + return {"error": "internal error"}, 500 + finally: + conn.close() + def get_device_details(self, hostname: str) -> Tuple[Dict[str, Any], int]: conn = self._db_conn() try: @@ -1028,6 +1423,12 @@ def register_management(app, adapters: "LegacyServiceAdapters") -> None: service = DeviceManagementService(app, adapters) blueprint = Blueprint("devices", __name__) + @blueprint.route("/api/agent/details", methods=["POST"]) + @require_device_auth(adapters.device_auth_manager) + def _agent_details(): + payload, status = service.save_agent_details() + return jsonify(payload), status + @blueprint.route("/api/devices", methods=["GET"]) def _list_devices(): payload, status = service.list_devices()