From 3524faa40ffa0ae9154d4db98b698658fa4e3f24 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Wed, 22 Oct 2025 13:45:12 -0600 Subject: [PATCH] Add Engine realtime services and agent WebSocket handlers --- Data/Engine/CURRENT_STAGE.md | 2 +- Data/Engine/README.md | 10 +- Data/Engine/bootstrapper.py | 2 +- Data/Engine/interfaces/ws/__init__.py | 7 +- Data/Engine/interfaces/ws/agents/__init__.py | 4 +- Data/Engine/interfaces/ws/agents/events.py | 261 ++++++++++++++- .../interfaces/ws/job_management/__init__.py | 4 +- .../interfaces/ws/job_management/events.py | 29 +- .../repositories/sqlite/device_repository.py | 71 +++++ Data/Engine/services/__init__.py | 3 + Data/Engine/services/container.py | 8 + Data/Engine/services/realtime/__init__.py | 10 + .../services/realtime/agent_registry.py | 301 ++++++++++++++++++ 13 files changed, 683 insertions(+), 29 deletions(-) create mode 100644 Data/Engine/services/realtime/__init__.py create mode 100644 Data/Engine/services/realtime/agent_registry.py diff --git a/Data/Engine/CURRENT_STAGE.md b/Data/Engine/CURRENT_STAGE.md index 80cb072..0dc0eed 100644 --- a/Data/Engine/CURRENT_STAGE.md +++ b/Data/Engine/CURRENT_STAGE.md @@ -43,7 +43,7 @@ - 8.3 Register blueprints through Engine `server.py`; confirm endpoints respond via manual or automated tests. - 8.4 Commit after each major blueprint migration for clear milestones. -9. Rebuild WebSocket interfaces +[COMPLETED] 9. Rebuild WebSocket interfaces - 9.1 Establish feature-scoped modules (e.g., `interfaces/ws/agents/events.py`) and copy event handlers. - 9.2 Replace global state with repository/service calls where feasible; otherwise encapsulate in Engine-managed caches. - 9.3 Validate namespace registration with Socket.IO test clients before committing. diff --git a/Data/Engine/README.md b/Data/Engine/README.md index fda3c1a..bba7df1 100644 --- a/Data/Engine/README.md +++ b/Data/Engine/README.md @@ -39,7 +39,15 @@ The Engine now exposes working HTTP routes alongside the remaining scaffolding: - `Data/Engine/interfaces/http/enrollment.py` handles the enrollment handshake (`/api/agent/enroll/request` and `/api/agent/enroll/poll`) with rate limiting, nonce protection, and repository-backed approvals. - The admin and agent blueprints remain placeholders until their services migrate. -WebSocket namespaces continue to follow the same pattern in `Data/Engine/interfaces/ws/`, with feature-oriented modules (e.g., `agents`, `job_management`) registered by `bootstrapper.bootstrap()` when Socket.IO is available. +## WebSocket interfaces + +Step 9 introduces real-time handlers backed by the new service container: + +- `Data/Engine/services/realtime/agent_registry.py` manages connected-agent state, last-seen persistence, collector updates, and screenshot caches without sharing globals with the legacy server. +- `Data/Engine/interfaces/ws/agents/events.py` ports the agent namespace, handling connect/disconnect logging, heartbeat reconciliation, screenshot relays, macro status broadcasts, and provisioning lookups through the realtime service. +- `Data/Engine/interfaces/ws/job_management/events.py` registers the job namespace; detailed scheduler coordination will arrive with the Step 10 migration. + +The WebSocket factory (`Data/Engine/interfaces/ws/__init__.py`) now accepts the Engine service container so namespaces can resolve dependencies just like their HTTP counterparts. ## Authentication services diff --git a/Data/Engine/bootstrapper.py b/Data/Engine/bootstrapper.py index 7a18f45..56da58e 100644 --- a/Data/Engine/bootstrapper.py +++ b/Data/Engine/bootstrapper.py @@ -50,7 +50,7 @@ def bootstrap() -> EngineRuntime: app.extensions["engine_services"] = services register_http_interfaces(app, services) socketio = create_socket_server(app, settings.socketio) - register_ws_interfaces(socketio) + register_ws_interfaces(socketio, services) logger.info("bootstrap-complete") return EngineRuntime(app=app, settings=settings, socketio=socketio, db_factory=db_factory) diff --git a/Data/Engine/interfaces/ws/__init__.py b/Data/Engine/interfaces/ws/__init__.py index 2850637..5ed3340 100644 --- a/Data/Engine/interfaces/ws/__init__.py +++ b/Data/Engine/interfaces/ws/__init__.py @@ -7,6 +7,7 @@ from typing import Any, Optional from flask import Flask from ...config import SocketIOSettings +from ...services.container import EngineServiceContainer from .agents import register as register_agent_events from .job_management import register as register_job_events @@ -33,14 +34,14 @@ def create_socket_server(app: Flask, settings: SocketIOSettings) -> Optional[Soc return socketio -def register_ws_interfaces(socketio: Any) -> None: - """Attach placeholder namespaces for the Engine Socket.IO server.""" +def register_ws_interfaces(socketio: Any, services: EngineServiceContainer) -> None: + """Attach namespaces for the Engine Socket.IO server.""" if socketio is None: # pragma: no cover - guard return for registrar in (register_agent_events, register_job_events): - registrar(socketio) + registrar(socketio, services) __all__ = ["create_socket_server", "register_ws_interfaces"] diff --git a/Data/Engine/interfaces/ws/agents/__init__.py b/Data/Engine/interfaces/ws/agents/__init__.py index b048906..cc52081 100644 --- a/Data/Engine/interfaces/ws/agents/__init__.py +++ b/Data/Engine/interfaces/ws/agents/__init__.py @@ -7,10 +7,10 @@ from typing import Any from . import events -def register(socketio: Any) -> None: +def register(socketio: Any, services) -> None: """Register agent namespaces on the given Socket.IO *socketio* instance.""" - events.register(socketio) + events.register(socketio, services) __all__ = ["register"] diff --git a/Data/Engine/interfaces/ws/agents/events.py b/Data/Engine/interfaces/ws/agents/events.py index e47aed0..0515de5 100644 --- a/Data/Engine/interfaces/ws/agents/events.py +++ b/Data/Engine/interfaces/ws/agents/events.py @@ -1,20 +1,261 @@ -"""Agent WebSocket event placeholders for the Engine.""" +"""Agent WebSocket event handlers for the Borealis Engine.""" from __future__ import annotations -from typing import Any +import logging +import time +from typing import Any, Dict, Iterable, Optional + +from flask import request + +from Data.Engine.services.container import EngineServiceContainer + +try: # pragma: no cover - optional dependency guard + from flask_socketio import emit, join_room +except Exception: # pragma: no cover - optional dependency guard + emit = None # type: ignore[assignment] + join_room = None # type: ignore[assignment] + +_AGENT_CONTEXT_HEADER = "X-Borealis-Agent-Context" -def register(socketio: Any) -> None: - """Register agent-related namespaces on *socketio*. - - The concrete event handlers will be migrated in later phases. - """ - +def register(socketio: Any, services: EngineServiceContainer) -> None: if socketio is None: # pragma: no cover - guard return - # Placeholder for namespace registration, e.g. ``socketio.on_namespace(...)``. - return + + handlers = _AgentEventHandlers(socketio, services) + socketio.on_event("connect", handlers.on_connect) + socketio.on_event("disconnect", handlers.on_disconnect) + socketio.on_event("agent_screenshot_task", handlers.on_agent_screenshot_task) + socketio.on_event("connect_agent", handlers.on_connect_agent) + socketio.on_event("agent_heartbeat", handlers.on_agent_heartbeat) + socketio.on_event("collector_status", handlers.on_collector_status) + socketio.on_event("request_config", handlers.on_request_config) + socketio.on_event("screenshot", handlers.on_screenshot) + socketio.on_event("macro_status", handlers.on_macro_status) + socketio.on_event("list_agent_windows", handlers.on_list_agent_windows) + socketio.on_event("agent_window_list", handlers.on_agent_window_list) + socketio.on_event("ansible_playbook_cancel", handlers.on_ansible_playbook_cancel) + socketio.on_event("ansible_playbook_run", handlers.on_ansible_playbook_run) + + +class _AgentEventHandlers: + def __init__(self, socketio: Any, services: EngineServiceContainer) -> None: + self._socketio = socketio + self._services = services + self._realtime = services.agent_realtime + self._log = logging.getLogger("borealis.engine.ws.agents") + + # ------------------------------------------------------------------ + # Connection lifecycle + # ------------------------------------------------------------------ + def on_connect(self) -> None: + sid = getattr(request, "sid", "") + remote_addr = getattr(request, "remote_addr", None) + transport = None + try: + transport = request.args.get("transport") # type: ignore[attr-defined] + except Exception: + transport = None + query = self._render_query() + headers = _summarize_socket_headers(getattr(request, "headers", {})) + scope = _canonical_scope(getattr(request.headers, "get", lambda *_: None)(_AGENT_CONTEXT_HEADER)) + self._log.info( + "socket-connect sid=%s ip=%s transport=%r query=%s headers=%s scope=%s", + sid, + remote_addr, + transport, + query, + headers, + scope or "", + ) + + def on_disconnect(self) -> None: + sid = getattr(request, "sid", "") + remote_addr = getattr(request, "remote_addr", None) + self._log.info("socket-disconnect sid=%s ip=%s", sid, remote_addr) + + # ------------------------------------------------------------------ + # Agent coordination + # ------------------------------------------------------------------ + def on_agent_screenshot_task(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + agent_id = payload.get("agent_id") + node_id = payload.get("node_id") + image = payload.get("image_base64", "") + + if not agent_id or not node_id: + self._log.warning("screenshot-task missing identifiers: %s", payload) + return + + if image: + self._realtime.store_task_screenshot(agent_id, node_id, image) + + try: + self._socketio.emit("agent_screenshot_task", payload) + except Exception as exc: # pragma: no cover - network guard + self._log.warning("socket emit failed for agent_screenshot_task: %s", exc) + + def on_connect_agent(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + agent_id = payload.get("agent_id") + if not agent_id: + return + + service_mode = payload.get("service_mode") + record = self._realtime.register_connection(agent_id, service_mode) + + if join_room is not None: # pragma: no branch - optional dependency guard + try: + join_room(agent_id) + except Exception as exc: # pragma: no cover - dependency guard + self._log.debug("join_room failed for %s: %s", agent_id, exc) + + self._log.info( + "agent-connected agent_id=%s mode=%s status=%s", + agent_id, + record.service_mode, + record.status, + ) + + def on_agent_heartbeat(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + record = self._realtime.heartbeat(payload) + if record: + self._log.debug( + "agent-heartbeat agent_id=%s host=%s mode=%s", record.agent_id, record.hostname, record.service_mode + ) + + def on_collector_status(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + self._realtime.collector_status(payload) + + def on_request_config(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + agent_id = payload.get("agent_id") + if not agent_id: + return + config = self._realtime.get_agent_config(agent_id) + if config and emit is not None: + try: + emit("agent_config", {**config, "agent_id": agent_id}) + except Exception as exc: # pragma: no cover - dependency guard + self._log.debug("emit(agent_config) failed for %s: %s", agent_id, exc) + + # ------------------------------------------------------------------ + # Media + relay events + # ------------------------------------------------------------------ + def on_screenshot(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + agent_id = payload.get("agent_id") + image = payload.get("image_base64") + if agent_id and image: + self._realtime.store_agent_screenshot(agent_id, image) + try: + self._socketio.emit("new_screenshot", {"agent_id": agent_id, "image_base64": image}) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for new_screenshot: %s", exc) + + def on_macro_status(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + agent_id = payload.get("agent_id") + node_id = payload.get("node_id") + success = payload.get("success") + message = payload.get("message") + self._log.info( + "macro-status agent=%s node=%s success=%s message=%s", + agent_id, + node_id, + success, + message, + ) + try: + self._socketio.emit("macro_status", payload) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for macro_status: %s", exc) + + def on_list_agent_windows(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + try: + self._socketio.emit("list_agent_windows", payload) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for list_agent_windows: %s", exc) + + def on_agent_window_list(self, data: Optional[Dict[str, Any]]) -> None: + payload = data or {} + try: + self._socketio.emit("agent_window_list", payload) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for agent_window_list: %s", exc) + + def on_ansible_playbook_cancel(self, data: Optional[Dict[str, Any]]) -> None: + try: + self._socketio.emit("ansible_playbook_cancel", data or {}) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for ansible_playbook_cancel: %s", exc) + + def on_ansible_playbook_run(self, data: Optional[Dict[str, Any]]) -> None: + try: + self._socketio.emit("ansible_playbook_run", data or {}) + except Exception as exc: # pragma: no cover - dependency guard + self._log.warning("socket emit failed for ansible_playbook_run: %s", exc) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + def _render_query(self) -> str: + try: + pairs = [f"{k}={v}" for k, v in request.args.items()] # type: ignore[attr-defined] + except Exception: + return "" + return "&".join(pairs) if pairs else "" + + +def _canonical_scope(raw: Optional[str]) -> Optional[str]: + if not raw: + return None + value = "".join(ch for ch in str(raw) if ch.isalnum() or ch in ("_", "-")) + if not value: + return None + return value.upper() + + +def _mask_value(value: str, *, prefix: int = 4, suffix: int = 4) -> str: + try: + if not value: + return "" + stripped = value.strip() + if len(stripped) <= prefix + suffix: + return "*" * len(stripped) + return f"{stripped[:prefix]}***{stripped[-suffix:]}" + except Exception: + return "***" + + +def _summarize_socket_headers(headers: Any) -> str: + try: + items: Iterable[tuple[str, Any]] + if isinstance(headers, dict): + items = headers.items() + else: + items = getattr(headers, "items", lambda: [])() + except Exception: + items = [] + + rendered = [] + for key, value in items: + lowered = str(key).lower() + display = value + if lowered == "authorization": + token = str(value or "") + if token.lower().startswith("bearer "): + display = f"Bearer {_mask_value(token.split(' ', 1)[1])}" + else: + display = _mask_value(token) + elif lowered == "cookie": + display = "" + rendered.append(f"{key}={display}") + return ", ".join(rendered) if rendered else "" __all__ = ["register"] diff --git a/Data/Engine/interfaces/ws/job_management/__init__.py b/Data/Engine/interfaces/ws/job_management/__init__.py index e47eee7..225073f 100644 --- a/Data/Engine/interfaces/ws/job_management/__init__.py +++ b/Data/Engine/interfaces/ws/job_management/__init__.py @@ -7,10 +7,10 @@ from typing import Any from . import events -def register(socketio: Any) -> None: +def register(socketio: Any, services) -> None: """Register job management namespaces on the given Socket.IO *socketio*.""" - events.register(socketio) + events.register(socketio, services) __all__ = ["register"] diff --git a/Data/Engine/interfaces/ws/job_management/events.py b/Data/Engine/interfaces/ws/job_management/events.py index b364ee1..b84aa8f 100644 --- a/Data/Engine/interfaces/ws/job_management/events.py +++ b/Data/Engine/interfaces/ws/job_management/events.py @@ -1,19 +1,30 @@ -"""Job management WebSocket event placeholders for the Engine.""" +"""Job management WebSocket event handlers.""" from __future__ import annotations -from typing import Any +import logging +from typing import Any, Optional + +from Data.Engine.services.container import EngineServiceContainer -def register(socketio: Any) -> None: - """Register job management namespaces on *socketio*. - - Concrete handlers will be migrated in later phases. - """ - +def register(socketio: Any, services: EngineServiceContainer) -> None: if socketio is None: # pragma: no cover - guard return - return + + handlers = _JobEventHandlers(socketio, services) + socketio.on_event("quick_job_result", handlers.on_quick_job_result) + + +class _JobEventHandlers: + def __init__(self, socketio: Any, services: EngineServiceContainer) -> None: + self._socketio = socketio + self._services = services + self._log = logging.getLogger("borealis.engine.ws.jobs") + + def on_quick_job_result(self, data: Optional[dict]) -> None: + self._log.info("quick-job-result received; scheduler migration pending") + # Step 10 will introduce full persistence + broadcast logic. __all__ = ["register"] diff --git a/Data/Engine/repositories/sqlite/device_repository.py b/Data/Engine/repositories/sqlite/device_repository.py index 481243e..b88bb6e 100644 --- a/Data/Engine/repositories/sqlite/device_repository.py +++ b/Data/Engine/repositories/sqlite/device_repository.py @@ -280,6 +280,77 @@ class SQLiteDeviceRepository: ) conn.commit() + def update_device_summary( + self, + *, + hostname: Optional[str], + last_seen: Optional[int] = None, + agent_id: Optional[str] = None, + operating_system: Optional[str] = None, + last_user: Optional[str] = None, + ) -> None: + if not hostname: + return + + normalized_hostname = (hostname or "").strip() + if not normalized_hostname: + return + + fields = [] + params = [] + + if last_seen is not None: + try: + fields.append("last_seen = ?") + params.append(int(last_seen)) + except Exception: + pass + + if agent_id: + try: + candidate = agent_id.strip() + except Exception: + candidate = agent_id + if candidate: + fields.append("agent_id = ?") + params.append(candidate) + + if operating_system: + try: + os_value = operating_system.strip() + except Exception: + os_value = operating_system + if os_value: + fields.append("operating_system = ?") + params.append(os_value) + + if last_user: + try: + user_value = last_user.strip() + except Exception: + user_value = last_user + if user_value: + fields.append("last_user = ?") + params.append(user_value) + + if not fields: + return + + params.append(normalized_hostname) + + with closing(self._connections()) as conn: + cur = conn.cursor() + cur.execute( + f"UPDATE devices SET {', '.join(fields)} WHERE LOWER(hostname) = LOWER(?)", + params, + ) + if cur.rowcount == 0 and agent_id: + cur.execute( + f"UPDATE devices SET {', '.join(fields)} WHERE agent_id = ?", + params[:-1] + [agent_id], + ) + conn.commit() + def _row_to_record(self, row: tuple) -> Optional[DeviceRecord]: try: guid = DeviceGuid(row[0]) diff --git a/Data/Engine/services/__init__.py b/Data/Engine/services/__init__.py index dcb8c24..e8f8f7a 100644 --- a/Data/Engine/services/__init__.py +++ b/Data/Engine/services/__init__.py @@ -18,6 +18,7 @@ from .enrollment import ( EnrollmentValidationError, PollingResult, ) +from .realtime import AgentRealtimeService, AgentRecord __all__ = [ "DeviceAuthService", @@ -32,4 +33,6 @@ __all__ = [ "EnrollmentTokenBundle", "EnrollmentValidationError", "PollingResult", + "AgentRealtimeService", + "AgentRecord", ] diff --git a/Data/Engine/services/container.py b/Data/Engine/services/container.py index 4e883d8..03248c0 100644 --- a/Data/Engine/services/container.py +++ b/Data/Engine/services/container.py @@ -26,6 +26,7 @@ from Data.Engine.services.crypto.signing import ScriptSigner, load_signer from Data.Engine.services.enrollment import EnrollmentService from Data.Engine.services.enrollment.nonce_cache import NonceCache from Data.Engine.services.rate_limit import SlidingWindowRateLimiter +from Data.Engine.services.realtime import AgentRealtimeService __all__ = ["EngineServiceContainer", "build_service_container"] @@ -37,6 +38,7 @@ class EngineServiceContainer: enrollment_service: EnrollmentService jwt_service: JWTService dpop_validator: DPoPValidator + agent_realtime: AgentRealtimeService def build_service_container( @@ -84,12 +86,18 @@ def build_service_container( dpop_validator=dpop_validator, ) + agent_realtime = AgentRealtimeService( + device_repository=device_repo, + logger=log.getChild("agent_realtime"), + ) + return EngineServiceContainer( device_auth=device_auth, token_service=token_service, enrollment_service=enrollment_service, jwt_service=jwt_service, dpop_validator=dpop_validator, + agent_realtime=agent_realtime, ) diff --git a/Data/Engine/services/realtime/__init__.py b/Data/Engine/services/realtime/__init__.py new file mode 100644 index 0000000..661b469 --- /dev/null +++ b/Data/Engine/services/realtime/__init__.py @@ -0,0 +1,10 @@ +"""Realtime coordination services for the Borealis Engine.""" + +from __future__ import annotations + +from .agent_registry import AgentRealtimeService, AgentRecord + +__all__ = [ + "AgentRealtimeService", + "AgentRecord", +] diff --git a/Data/Engine/services/realtime/agent_registry.py b/Data/Engine/services/realtime/agent_registry.py new file mode 100644 index 0000000..5b9bb15 --- /dev/null +++ b/Data/Engine/services/realtime/agent_registry.py @@ -0,0 +1,301 @@ +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional, Tuple + +from Data.Engine.repositories.sqlite import SQLiteDeviceRepository + +__all__ = ["AgentRealtimeService", "AgentRecord"] + + +@dataclass(slots=True) +class AgentRecord: + """In-memory representation of a connected agent.""" + + agent_id: str + hostname: str = "unknown" + agent_operating_system: str = "-" + last_seen: int = 0 + status: str = "orphaned" + service_mode: str = "currentuser" + is_script_agent: bool = False + collector_active_ts: Optional[float] = None + + +class AgentRealtimeService: + """Track realtime agent presence and provide persistence hooks.""" + + def __init__( + self, + *, + device_repository: SQLiteDeviceRepository, + logger: Optional[logging.Logger] = None, + ) -> None: + self._device_repository = device_repository + self._log = logger or logging.getLogger("borealis.engine.services.realtime.agents") + self._agents: Dict[str, AgentRecord] = {} + self._configs: Dict[str, Dict[str, Any]] = {} + self._screenshots: Dict[str, Dict[str, Any]] = {} + self._task_screenshots: Dict[Tuple[str, str], Dict[str, Any]] = {} + + # ------------------------------------------------------------------ + # Agent presence management + # ------------------------------------------------------------------ + def register_connection(self, agent_id: str, service_mode: Optional[str]) -> AgentRecord: + record = self._agents.get(agent_id) or AgentRecord(agent_id=agent_id) + mode = self.normalize_service_mode(service_mode, agent_id) + now = int(time.time()) + + record.service_mode = mode + record.is_script_agent = self._is_script_agent(agent_id) + record.last_seen = now + record.status = "provisioned" if agent_id in self._configs else "orphaned" + self._agents[agent_id] = record + + self._persist_activity( + hostname=record.hostname, + last_seen=record.last_seen, + agent_id=agent_id, + operating_system=record.agent_operating_system, + ) + return record + + def heartbeat(self, payload: Mapping[str, Any]) -> Optional[AgentRecord]: + if not payload: + return None + + agent_id = payload.get("agent_id") + if not agent_id: + return None + + hostname = payload.get("hostname") or "" + mode = self.normalize_service_mode(payload.get("service_mode"), agent_id) + is_script_agent = self._is_script_agent(agent_id) + last_seen = self._coerce_int(payload.get("last_seen"), default=int(time.time())) + operating_system = (payload.get("agent_operating_system") or "-").strip() or "-" + + if hostname: + self._reconcile_hostname_collisions( + hostname=hostname, + agent_id=agent_id, + incoming_mode=mode, + is_script_agent=is_script_agent, + last_seen=last_seen, + ) + + record = self._agents.get(agent_id) or AgentRecord(agent_id=agent_id) + if hostname: + record.hostname = hostname + record.agent_operating_system = operating_system + record.last_seen = last_seen + record.service_mode = mode + record.is_script_agent = is_script_agent + record.status = "provisioned" if agent_id in self._configs else record.status or "orphaned" + self._agents[agent_id] = record + + self._persist_activity( + hostname=record.hostname or hostname, + last_seen=record.last_seen, + agent_id=agent_id, + operating_system=record.agent_operating_system, + ) + return record + + def collector_status(self, payload: Mapping[str, Any]) -> None: + if not payload: + return + + agent_id = payload.get("agent_id") + if not agent_id: + return + + hostname = payload.get("hostname") or "" + mode = self.normalize_service_mode(payload.get("service_mode"), agent_id) + active = bool(payload.get("active")) + last_user = (payload.get("last_user") or "").strip() + + record = self._agents.get(agent_id) or AgentRecord(agent_id=agent_id) + if hostname: + record.hostname = hostname + if mode: + record.service_mode = mode + record.is_script_agent = self._is_script_agent(agent_id) or record.is_script_agent + if active: + record.collector_active_ts = time.time() + self._agents[agent_id] = record + + if ( + last_user + and hostname + and self._is_valid_interactive_user(last_user) + and not self._is_system_service_agent(agent_id, record.is_script_agent) + ): + self._persist_activity( + hostname=hostname, + last_seen=int(time.time()), + agent_id=agent_id, + operating_system=record.agent_operating_system, + last_user=last_user, + ) + + # ------------------------------------------------------------------ + # Configuration management + # ------------------------------------------------------------------ + def set_agent_config(self, agent_id: str, config: Mapping[str, Any]) -> None: + self._configs[agent_id] = dict(config) + record = self._agents.get(agent_id) + if record: + record.status = "provisioned" + + def get_agent_config(self, agent_id: str) -> Optional[Dict[str, Any]]: + config = self._configs.get(agent_id) + if config is None: + return None + return dict(config) + + # ------------------------------------------------------------------ + # Screenshot caches + # ------------------------------------------------------------------ + def store_agent_screenshot(self, agent_id: str, image_base64: str) -> None: + self._screenshots[agent_id] = { + "image_base64": image_base64, + "timestamp": time.time(), + } + + def store_task_screenshot(self, agent_id: str, node_id: str, image_base64: str) -> None: + self._task_screenshots[(agent_id, node_id)] = { + "image_base64": image_base64, + "timestamp": time.time(), + } + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + @staticmethod + def normalize_service_mode(value: Optional[str], agent_id: Optional[str] = None) -> str: + text = "" + try: + if isinstance(value, str): + text = value.strip().lower() + except Exception: + text = "" + + if not text and agent_id: + try: + lowered = agent_id.lower() + if "-svc-" in lowered or lowered.endswith("-svc"): + return "system" + except Exception: + pass + + if text in {"system", "svc", "service", "system_service"}: + return "system" + if text in {"interactive", "currentuser", "user", "current_user"}: + return "currentuser" + return "currentuser" + + @staticmethod + def _coerce_int(value: Any, *, default: int = 0) -> int: + try: + return int(value) + except Exception: + return default + + @staticmethod + def _is_script_agent(agent_id: Optional[str]) -> bool: + try: + return bool(isinstance(agent_id, str) and agent_id.lower().endswith("-script")) + except Exception: + return False + + @staticmethod + def _is_valid_interactive_user(candidate: Optional[str]) -> bool: + if not candidate: + return False + try: + text = str(candidate).strip() + except Exception: + return False + if not text: + return False + upper = text.upper() + if text.endswith("$"): + return False + if "NT AUTHORITY\\" in upper or "NT SERVICE\\" in upper: + return False + if upper.endswith("\\SYSTEM"): + return False + if upper.endswith("\\LOCAL SERVICE"): + return False + if upper.endswith("\\NETWORK SERVICE"): + return False + if upper == "ANONYMOUS LOGON": + return False + return True + + @staticmethod + def _is_system_service_agent(agent_id: str, is_script_agent: bool) -> bool: + try: + lowered = agent_id.lower() + except Exception: + lowered = "" + if is_script_agent: + return False + return "-svc-" in lowered or lowered.endswith("-svc") + + def _reconcile_hostname_collisions( + self, + *, + hostname: str, + agent_id: str, + incoming_mode: str, + is_script_agent: bool, + last_seen: int, + ) -> None: + transferred_config = False + for existing_id, info in list(self._agents.items()): + if existing_id == agent_id: + continue + if info.hostname != hostname: + continue + existing_mode = self.normalize_service_mode(info.service_mode, existing_id) + if existing_mode != incoming_mode: + continue + if is_script_agent and not info.is_script_agent: + self._persist_activity( + hostname=hostname, + last_seen=last_seen, + agent_id=existing_id, + operating_system=info.agent_operating_system, + ) + return + if not transferred_config and existing_id in self._configs and agent_id not in self._configs: + self._configs[agent_id] = dict(self._configs[existing_id]) + transferred_config = True + self._agents.pop(existing_id, None) + if existing_id != agent_id: + self._configs.pop(existing_id, None) + + def _persist_activity( + self, + *, + hostname: Optional[str], + last_seen: Optional[int], + agent_id: Optional[str], + operating_system: Optional[str], + last_user: Optional[str] = None, + ) -> None: + if not hostname: + return + try: + self._device_repository.update_device_summary( + hostname=hostname, + last_seen=last_seen, + agent_id=agent_id, + operating_system=operating_system, + last_user=last_user, + ) + except Exception as exc: # pragma: no cover - defensive logging + self._log.debug("failed to persist device activity for %s: %s", hostname, exc)