From db8dd423f6741ada1cf617ef057e7805f62e6272 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Mon, 1 Dec 2025 01:40:23 -0700 Subject: [PATCH] Agent Reverse Tunneling - Engine Tunnel Service Implementation --- Data/Agent/agent-requirements.txt | 1 + Data/Engine/config.py | 118 ++ Data/Engine/engine-requirements.txt | 1 + Data/Engine/server.py | 12 + Data/Engine/services/API/__init__.py | 2 + Data/Engine/services/API/devices/tunnel.py | 138 +++ .../services/WebSocket/Agent/ReverseTunnel.py | 1084 +++++++++++++++++ .../services/WebSocket/Agent/__init__.py | 10 + Data/Engine/services/WebSocket/__init__.py | 178 +++ .../src/Devices/Filters/Filter_Editor.jsx | 7 +- .../src/Devices/Filters/Filter_List.jsx | 8 +- .../Agent_Reverse_Tunneling.md | 92 +- 12 files changed, 1638 insertions(+), 13 deletions(-) create mode 100644 Data/Engine/services/API/devices/tunnel.py create mode 100644 Data/Engine/services/WebSocket/Agent/ReverseTunnel.py create mode 100644 Data/Engine/services/WebSocket/Agent/__init__.py diff --git a/Data/Agent/agent-requirements.txt b/Data/Agent/agent-requirements.txt index 4a37c88f..8a152489 100644 --- a/Data/Agent/agent-requirements.txt +++ b/Data/Agent/agent-requirements.txt @@ -27,6 +27,7 @@ pywinauto # Windows-based Macro Automation Library sounddevice numpy pywin32; platform_system == "Windows" +pywinpty; platform_system == "Windows" # ConPTY bridge for reverse tunnel PowerShell sessions # Ansible Libraries ansible-core diff --git a/Data/Engine/config.py b/Data/Engine/config.py index f1838bd6..b50f6281 100644 --- a/Data/Engine/config.py +++ b/Data/Engine/config.py @@ -77,6 +77,13 @@ LOG_ROOT = PROJECT_ROOT / "Engine" / "Logs" LOG_FILE_PATH = LOG_ROOT / "engine.log" ERROR_LOG_FILE_PATH = LOG_ROOT / "error.log" API_LOG_FILE_PATH = LOG_ROOT / "api.log" +REVERSE_TUNNEL_LOG_FILE_PATH = LOG_ROOT / "reverse_tunnel.log" + +DEFAULT_TUNNEL_FIXED_PORT = 8443 +DEFAULT_TUNNEL_PORT_RANGE = (30000, 40000) +DEFAULT_TUNNEL_IDLE_TIMEOUT_SECONDS = 3600 +DEFAULT_TUNNEL_GRACE_TIMEOUT_SECONDS = 3600 +DEFAULT_TUNNEL_HEARTBEAT_INTERVAL_SECONDS = 20 def _ensure_parent(path: Path) -> None: @@ -140,6 +147,71 @@ def _parse_bool(raw: Any, *, default: bool = False) -> bool: return default +def _parse_int( + raw: Any, + *, + default: int, + minimum: Optional[int] = None, + maximum: Optional[int] = None, +) -> int: + try: + value = int(raw) + except Exception: + return default + if minimum is not None and value < minimum: + return default + if maximum is not None and value > maximum: + return default + return value + + +def _parse_port_range( + raw: Any, + *, + default: Tuple[int, int], +) -> Tuple[int, int]: + if raw is None: + return default + + start, end = default + candidate: Optional[Tuple[int, int]] = None + + def _clamp_pair(values: Tuple[int, int]) -> Tuple[int, int]: + low, high = values + if low < 1 or high > 65535 or low > high: + return default + return low, high + + if isinstance(raw, str): + separators = ("-", ":", ",") + for separator in separators: + if separator in raw: + parts = [part.strip() for part in raw.split(separator)] + break + else: + parts = [raw.strip()] + try: + if len(parts) == 2: + candidate = (int(parts[0]), int(parts[1])) + elif len(parts) == 1 and parts[0]: + port = int(parts[0]) + candidate = (port, port) + except Exception: + candidate = None + elif isinstance(raw, Sequence): + try: + values = [int(part) for part in raw] + except Exception: + values = [] + if len(values) >= 2: + candidate = (values[0], values[1]) + + if candidate is None: + return default + + return _clamp_pair(candidate) + + def _discover_tls_material(config: Mapping[str, Any]) -> Sequence[Optional[str]]: cert_path = config.get("TLS_CERT_PATH") or os.environ.get("BOREALIS_TLS_CERT") or None key_path = config.get("TLS_KEY_PATH") or os.environ.get("BOREALIS_TLS_KEY") or None @@ -183,6 +255,12 @@ class EngineSettings: error_log_file: str api_log_file: str api_groups: Tuple[str, ...] + reverse_tunnel_fixed_port: int + reverse_tunnel_port_range: Tuple[int, int] + reverse_tunnel_idle_timeout_seconds: int + reverse_tunnel_grace_timeout_seconds: int + reverse_tunnel_heartbeat_seconds: int + reverse_tunnel_log_file: str raw: MutableMapping[str, Any] = field(default_factory=dict) def to_flask_config(self) -> MutableMapping[str, Any]: @@ -279,6 +357,11 @@ def load_runtime_config(overrides: Optional[Mapping[str, Any]] = None) -> Engine api_log_file = str(runtime_config.get("API_LOG_FILE") or API_LOG_FILE_PATH) _ensure_parent(Path(api_log_file)) + reverse_tunnel_log_file = str( + runtime_config.get("REVERSE_TUNNEL_LOG_FILE") or REVERSE_TUNNEL_LOG_FILE_PATH + ) + _ensure_parent(Path(reverse_tunnel_log_file)) + api_groups = _parse_api_groups( runtime_config.get("API_GROUPS") or os.environ.get("BOREALIS_API_GROUPS") ) @@ -294,6 +377,35 @@ def load_runtime_config(overrides: Optional[Mapping[str, Any]] = None) -> Engine "scheduled_jobs", ) + tunnel_fixed_port = _parse_int( + runtime_config.get("TUNNEL_FIXED_PORT") or os.environ.get("BOREALIS_TUNNEL_FIXED_PORT"), + default=DEFAULT_TUNNEL_FIXED_PORT, + minimum=1, + maximum=65535, + ) + tunnel_port_range = _parse_port_range( + runtime_config.get("TUNNEL_PORT_RANGE") or os.environ.get("BOREALIS_TUNNEL_PORT_RANGE"), + default=DEFAULT_TUNNEL_PORT_RANGE, + ) + tunnel_idle_timeout_seconds = _parse_int( + runtime_config.get("TUNNEL_IDLE_TIMEOUT_SECONDS") + or os.environ.get("BOREALIS_TUNNEL_IDLE_TIMEOUT_SECONDS"), + default=DEFAULT_TUNNEL_IDLE_TIMEOUT_SECONDS, + minimum=60, + ) + tunnel_grace_timeout_seconds = _parse_int( + runtime_config.get("TUNNEL_GRACE_TIMEOUT_SECONDS") + or os.environ.get("BOREALIS_TUNNEL_GRACE_TIMEOUT_SECONDS"), + default=DEFAULT_TUNNEL_GRACE_TIMEOUT_SECONDS, + minimum=60, + ) + tunnel_heartbeat_seconds = _parse_int( + runtime_config.get("TUNNEL_HEARTBEAT_SECONDS") + or os.environ.get("BOREALIS_TUNNEL_HEARTBEAT_SECONDS"), + default=DEFAULT_TUNNEL_HEARTBEAT_INTERVAL_SECONDS, + minimum=5, + ) + settings = EngineSettings( database_path=database_path, static_folder=static_folder, @@ -309,6 +421,12 @@ def load_runtime_config(overrides: Optional[Mapping[str, Any]] = None) -> Engine error_log_file=str(error_log_file), api_log_file=str(api_log_file), api_groups=api_groups, + reverse_tunnel_fixed_port=tunnel_fixed_port, + reverse_tunnel_port_range=tunnel_port_range, + reverse_tunnel_idle_timeout_seconds=tunnel_idle_timeout_seconds, + reverse_tunnel_grace_timeout_seconds=tunnel_grace_timeout_seconds, + reverse_tunnel_heartbeat_seconds=tunnel_heartbeat_seconds, + reverse_tunnel_log_file=reverse_tunnel_log_file, raw=runtime_config, ) return settings diff --git a/Data/Engine/engine-requirements.txt b/Data/Engine/engine-requirements.txt index 30c7c5e5..f254e7e6 100644 --- a/Data/Engine/engine-requirements.txt +++ b/Data/Engine/engine-requirements.txt @@ -9,3 +9,4 @@ pyotp qrcode Pillow requests +websockets diff --git a/Data/Engine/server.py b/Data/Engine/server.py index 03e4447c..71659106 100644 --- a/Data/Engine/server.py +++ b/Data/Engine/server.py @@ -118,6 +118,12 @@ class EngineContext: config: Mapping[str, Any] api_groups: Sequence[str] api_log_path: str + reverse_tunnel_fixed_port: int + reverse_tunnel_port_range: Tuple[int, int] + reverse_tunnel_idle_timeout_seconds: int + reverse_tunnel_grace_timeout_seconds: int + reverse_tunnel_heartbeat_seconds: int + reverse_tunnel_log_path: str assembly_cache: Optional[Any] = None @@ -136,6 +142,12 @@ def _build_engine_context(settings: EngineSettings, logger: logging.Logger) -> E config=settings.as_dict(), api_groups=settings.api_groups, api_log_path=settings.api_log_file, + reverse_tunnel_fixed_port=settings.reverse_tunnel_fixed_port, + reverse_tunnel_port_range=settings.reverse_tunnel_port_range, + reverse_tunnel_idle_timeout_seconds=settings.reverse_tunnel_idle_timeout_seconds, + reverse_tunnel_grace_timeout_seconds=settings.reverse_tunnel_grace_timeout_seconds, + reverse_tunnel_heartbeat_seconds=settings.reverse_tunnel_heartbeat_seconds, + reverse_tunnel_log_path=settings.reverse_tunnel_log_file, assembly_cache=None, ) diff --git a/Data/Engine/services/API/__init__.py b/Data/Engine/services/API/__init__.py index 35a6b810..d6753a18 100644 --- a/Data/Engine/services/API/__init__.py +++ b/Data/Engine/services/API/__init__.py @@ -32,6 +32,7 @@ from ...integrations import GitHubIntegration from ..auth import DevModeManager from .enrollment import routes as enrollment_routes from .tokens import routes as token_routes +from .devices.tunnel import register_tunnel from ...server import EngineContext from .access_management.login import register_auth @@ -285,6 +286,7 @@ def _register_devices(app: Flask, adapters: EngineServiceAdapters) -> None: register_management(app, adapters) register_admin_endpoints(app, adapters) device_routes.register_agents(app, adapters) + register_tunnel(app, adapters) def _register_filters(app: Flask, adapters: EngineServiceAdapters) -> None: filters_management.register_filters(app, adapters) diff --git a/Data/Engine/services/API/devices/tunnel.py b/Data/Engine/services/API/devices/tunnel.py new file mode 100644 index 00000000..a9a0b89f --- /dev/null +++ b/Data/Engine/services/API/devices/tunnel.py @@ -0,0 +1,138 @@ +# ====================================================== +# Data\Engine\services\API\devices\tunnel.py +# Description: Negotiation endpoint for reverse tunnel leases (operator-initiated; dormant until tunnel listener is wired). +# +# API Endpoints (if applicable): +# - POST /api/tunnel/request (Token Authenticated) - Allocates a reverse tunnel lease for the requested agent/protocol. +# ====================================================== + +"""Reverse tunnel negotiation API (Engine side).""" +from __future__ import annotations + +import os +from typing import Any, Dict, Optional, Tuple + +from flask import Blueprint, jsonify, request, session +from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer + +from ...WebSocket.Agent.ReverseTunnel import ReverseTunnelService + +if False: # pragma: no cover - import cycle hint for type checkers + from .. import EngineServiceAdapters + + +def _current_user(app) -> Optional[Dict[str, str]]: + """Resolve operator identity from session or signed token.""" + + username = session.get("username") + role = session.get("role") or "User" + if username: + return {"username": username, "role": role} + + token = None + auth_header = request.headers.get("Authorization") or "" + if auth_header.lower().startswith("bearer "): + token = auth_header.split(" ", 1)[1].strip() + if not token: + token = request.cookies.get("borealis_auth") + if not token: + return None + + try: + serializer = URLSafeTimedSerializer(app.secret_key or "borealis-dev-secret", salt="borealis-auth") + token_ttl = int(os.environ.get("BOREALIS_TOKEN_TTL_SECONDS", 60 * 60 * 24 * 30)) + data = serializer.loads(token, max_age=token_ttl) + username = data.get("u") + role = data.get("r") or "User" + if username: + return {"username": username, "role": role} + except (BadSignature, SignatureExpired, Exception): + return None + return None + + +def _require_login(app) -> Optional[Tuple[Dict[str, Any], int]]: + user = _current_user(app) + if not user: + return {"error": "unauthorized"}, 401 + return None + + +def _get_tunnel_service(adapters: "EngineServiceAdapters") -> ReverseTunnelService: + service = getattr(adapters.context, "reverse_tunnel_service", None) or getattr(adapters, "_reverse_tunnel_service", None) + if service is None: + service = ReverseTunnelService( + adapters.context, + signer=getattr(adapters, "script_signer", None), + db_conn_factory=adapters.db_conn_factory, + socketio=getattr(adapters.context, "socketio", None), + ) + service.start() + setattr(adapters, "_reverse_tunnel_service", service) + setattr(adapters.context, "reverse_tunnel_service", service) + return service + + +def _normalize_text(value: Any) -> str: + if value is None: + return "" + try: + return str(value).strip() + except Exception: + return "" + + +def register_tunnel(app, adapters: "EngineServiceAdapters") -> None: + """Register reverse tunnel negotiation endpoints.""" + + blueprint = Blueprint("reverse_tunnel", __name__) + service_log = adapters.service_log + logger = adapters.context.logger.getChild("tunnel.api") + + @blueprint.route("/api/tunnel/request", methods=["POST"]) + def request_tunnel(): + requirement = _require_login(app) + if requirement: + payload, status = requirement + return jsonify(payload), status + + user = _current_user(app) or {} + operator_id = user.get("username") or None + + body = request.get_json(silent=True) or {} + agent_id = _normalize_text(body.get("agent_id")) + protocol = _normalize_text(body.get("protocol") or "ps").lower() or "ps" + domain = _normalize_text(body.get("domain") or protocol).lower() or protocol + + if not agent_id: + return jsonify({"error": "agent_id_required"}), 400 + + tunnel_service = _get_tunnel_service(adapters) + try: + lease = tunnel_service.request_lease( + agent_id=agent_id, + protocol=protocol, + domain=domain, + operator_id=operator_id, + ) + except RuntimeError as exc: + message = str(exc) + if message.startswith("domain_limit:"): + domain_name = message.split(":", 1)[-1] if ":" in message else domain + return jsonify({"error": "domain_limit", "domain": domain_name}), 409 + if message == "port_pool_exhausted": + return jsonify({"error": "port_pool_exhausted"}), 503 + logger.warning("tunnel lease request failed for agent_id=%s: %s", agent_id, message) + return jsonify({"error": "lease_allocation_failed"}), 500 + + summary = tunnel_service.lease_summary(lease) + summary["fixed_port"] = tunnel_service.fixed_port + summary["heartbeat_seconds"] = tunnel_service.heartbeat_seconds + + service_log( + "reverse_tunnel", + f"lease created tunnel_id={lease.tunnel_id} agent_id={lease.agent_id} domain={lease.domain} protocol={lease.protocol}", + ) + return jsonify(summary), 200 + + app.register_blueprint(blueprint) diff --git a/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py b/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py new file mode 100644 index 00000000..7fc24e64 --- /dev/null +++ b/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py @@ -0,0 +1,1084 @@ +# ====================================================== +# Data\Engine\services\WebSocket\Agent\ReverseTunnel.py +# Description: Async reverse tunnel scaffolding (Engine side) providing lease management, domain limits, and placeholders for WebSocket listeners. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Engine-side reverse tunnel scaffolding. + +This module lays down the lease manager and configuration surface for the +Agent reverse tunnel without wiring listeners into the runtime. It preserves +the existing Socket.IO control plane while preparing async WebSocket +infrastructure to serve per-agent reverse tunnels. +""" +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +import secrets +import ssl +import struct +import time +from dataclasses import dataclass, field +from logging.handlers import TimedRotatingFileHandler +from pathlib import Path +from typing import Callable, Deque, Dict, Iterable, List, Optional, Tuple +from collections import deque +from threading import Thread + +try: # websockets is added to engine requirements + import websockets + from websockets.server import serve as ws_serve +except Exception: # pragma: no cover - dependency resolved at runtime + websockets = None + ws_serve = None + +from .....server import EngineContext + +TunnelState = str + + +def _utc_ts() -> float: + return time.time() + + +def _generate_tunnel_id() -> str: + # UUID4-like, but defer to secrets for a short scaffold without adding deps. + hex_blob = secrets.token_hex(16) + return f"{hex_blob[0:8]}-{hex_blob[8:12]}-{hex_blob[12:16]}-{hex_blob[16:20]}-{hex_blob[20:32]}" + + +class FrameDecodeError(Exception): + """Raised when an incoming frame is malformed.""" + + +class FrameValidationError(Exception): + """Raised when a frame fails validation.""" + + +# Message types +MSG_CONNECT = 0x01 +MSG_CONNECT_ACK = 0x02 +MSG_CHANNEL_OPEN = 0x03 +MSG_CHANNEL_ACK = 0x04 +MSG_DATA = 0x05 +MSG_WINDOW_UPDATE = 0x06 +MSG_HEARTBEAT = 0x07 +MSG_CLOSE = 0x08 +MSG_CONTROL = 0x09 + +# Close codes +CLOSE_OK = 0 +CLOSE_IDLE_TIMEOUT = 1 +CLOSE_GRACE_EXPIRED = 2 +CLOSE_PROTOCOL_ERROR = 3 +CLOSE_AUTH_FAILED = 4 +CLOSE_SERVER_SHUTDOWN = 5 +CLOSE_AGENT_SHUTDOWN = 6 +CLOSE_DOMAIN_LIMIT = 7 +CLOSE_UNEXPECTED_DISCONNECT = 8 + +FRAME_HEADER_STRUCT = struct.Struct(" bytes: + payload_len = len(self.payload or b"") + header = FRAME_HEADER_STRUCT.pack( + self.version, + self.msg_type, + self.flags, + self.reserved, + int(self.channel_id), + payload_len, + ) + return header + (self.payload or b"") + + +def decode_frame(buffer: bytes) -> TunnelFrame: + """Decode a single tunnel frame from bytes.""" + + if len(buffer) < FRAME_HEADER_STRUCT.size: + raise FrameDecodeError("frame_too_small") + try: + version, msg_type, flags, reserved, channel_id, length = FRAME_HEADER_STRUCT.unpack_from(buffer, 0) + except struct.error as exc: + raise FrameDecodeError(f"frame_unpack_error:{exc}") from exc + + if version != FRAME_VERSION: + raise FrameValidationError(f"unsupported_version:{version}") + if length < 0: + raise FrameValidationError("invalid_length") + expected_total = FRAME_HEADER_STRUCT.size + length + if len(buffer) < expected_total: + raise FrameDecodeError("incomplete_frame") + payload = buffer[FRAME_HEADER_STRUCT.size : expected_total] + if len(payload) != length: + raise FrameValidationError("length_mismatch") + + return TunnelFrame( + version=version, + msg_type=msg_type, + flags=flags, + reserved=reserved, + channel_id=channel_id, + payload=payload, + ) + + +def heartbeat_frame(channel_id: int = 0, *, is_ack: bool = False) -> TunnelFrame: + """Build a heartbeat ping/pong frame.""" + + flags = 0x1 if is_ack else 0x0 + return TunnelFrame(msg_type=MSG_HEARTBEAT, channel_id=channel_id, flags=flags, payload=b"") + + +def close_frame(channel_id: int, code: int, reason: str = "") -> TunnelFrame: + payload = json.dumps({"code": code, "reason": reason}, separators=(",", ":")).encode("utf-8") + return TunnelFrame(msg_type=MSG_CLOSE, channel_id=channel_id, payload=payload) + + +def _build_tunnel_logger(log_path: Path) -> logging.Logger: + """Create a dedicated reverse tunnel logger with daily rotation.""" + + try: + log_path.parent.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + logger = logging.getLogger("borealis.engine.reverse_tunnel") + if not logger.handlers: + formatter = logging.Formatter("%(asctime)s-%(name)s-%(levelname)s: %(message)s") + handler = TimedRotatingFileHandler(str(log_path), when="midnight", backupCount=0, encoding="utf-8") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + logger.propagate = False + return logger + + +@dataclass +class TunnelLease: + tunnel_id: str + agent_id: str + domain: str + protocol: str + operator_id: Optional[str] + assigned_port: int + token: Optional[str] = None + hostname: Optional[str] = None + activity_id: Optional[int] = None + created_at: float = field(default_factory=_utc_ts) + expires_at: Optional[float] = None + idle_timeout_seconds: int = 3600 + grace_timeout_seconds: int = 3600 + state: TunnelState = "pending" + last_activity_ts: float = field(default_factory=_utc_ts) + agent_connected_at: Optional[float] = None + agent_disconnected_at: Optional[float] = None + + def mark_active(self) -> None: + self.state = "active" + self.agent_connected_at = _utc_ts() + self.last_activity_ts = self.agent_connected_at + + def mark_disconnected(self) -> None: + self.agent_disconnected_at = _utc_ts() + self.last_activity_ts = self.agent_disconnected_at + + def touch(self) -> None: + self.last_activity_ts = _utc_ts() + + def mark_closing(self) -> None: + self.state = "closing" + + def mark_expired(self) -> None: + self.state = "expired" + + def to_summary(self) -> Dict[str, object]: + return { + "tunnel_id": self.tunnel_id, + "agent_id": self.agent_id, + "domain": self.domain, + "protocol": self.protocol, + "operator_id": self.operator_id, + "assigned_port": self.assigned_port, + "state": self.state, + "created_at": self.created_at, + "expires_at": self.expires_at, + "idle_timeout_seconds": self.idle_timeout_seconds, + "grace_timeout_seconds": self.grace_timeout_seconds, + "last_activity_ts": self.last_activity_ts, + "agent_connected_at": self.agent_connected_at, + "agent_disconnected_at": self.agent_disconnected_at, + } + + +class DomainPolicy: + """Enforce per-domain concurrency and defaults.""" + + DEFAULT_LIMITS = { + "ps": 1, + "rdp": 1, + "vnc": 1, + "webrtc": 1, + "ssh": None, # Unlimited + "winrm": None, # Unlimited + } + + def __init__(self, overrides: Optional[Dict[str, Optional[int]]] = None): + merged = dict(self.DEFAULT_LIMITS) + if overrides: + merged.update(overrides) + self.limits = merged + + def is_allowed(self, domain: str, active_count: int) -> bool: + limit = self.limits.get(domain) + if limit is None: + return True + return active_count < limit + + +class PortAllocator: + """Simple round-robin port allocator with reuse tracking.""" + + def __init__(self, start: int, end: int): + if start < 1 or end > 65535 or start > end: + raise ValueError("Invalid port range") + self.start = start + self.end = end + self._next = start + self._in_use: Dict[int, str] = {} + + def allocate(self, tunnel_id: str) -> Optional[int]: + for _ in range(self.start, self.end + 1): + candidate = self._next + self._next += 1 + if self._next > self.end: + self._next = self.start + if candidate in self._in_use: + continue + self._in_use[candidate] = tunnel_id + return candidate + return None + + def release(self, port: int) -> None: + self._in_use.pop(port, None) + + def in_use(self) -> Dict[int, str]: + return dict(self._in_use) + + +class TunnelLeaseManager: + """DHCP-like lease manager for reverse tunnels (Engine side).""" + + def __init__( + self, + *, + port_range: Tuple[int, int], + idle_timeout_seconds: int, + grace_timeout_seconds: int, + domain_policy: Optional[DomainPolicy] = None, + logger: Optional[logging.Logger] = None, + ): + self._allocator = PortAllocator(port_range[0], port_range[1]) + self.idle_timeout_seconds = idle_timeout_seconds + self.grace_timeout_seconds = grace_timeout_seconds + self.domain_policy = domain_policy or DomainPolicy() + self.logger = logger or logging.getLogger("borealis.engine.tunnel.lease") + self._leases: Dict[str, TunnelLease] = {} + + def _active_for_agent_domain(self, agent_id: str, domain: str) -> int: + active_states = {"pending", "active", "closing"} + return sum( + 1 + for lease in self._leases.values() + if lease.agent_id == agent_id and lease.domain == domain and lease.state in active_states + ) + + def allocate( + self, + *, + agent_id: str, + protocol: str, + domain: str, + operator_id: Optional[str], + token: Optional[str] = None, + ) -> TunnelLease: + in_domain = self._active_for_agent_domain(agent_id, domain) + if not self.domain_policy.is_allowed(domain, in_domain): + raise RuntimeError(f"domain_limit:{domain}") + + tunnel_id = _generate_tunnel_id() + port = self._allocator.allocate(tunnel_id) + if port is None: + raise RuntimeError("port_pool_exhausted") + + now_ts = _utc_ts() + lease = TunnelLease( + tunnel_id=tunnel_id, + agent_id=agent_id, + domain=domain, + protocol=protocol, + operator_id=operator_id, + assigned_port=port, + token=token, + created_at=now_ts, + expires_at=now_ts + self.grace_timeout_seconds, + idle_timeout_seconds=self.idle_timeout_seconds, + grace_timeout_seconds=self.grace_timeout_seconds, + state="pending", + last_activity_ts=now_ts, + ) + self._leases[tunnel_id] = lease + self.logger.info( + "lease_allocated tunnel_id=%s agent_id=%s domain=%s protocol=%s port=%s", + tunnel_id, + agent_id, + domain, + protocol, + port, + ) + return lease + + def release(self, tunnel_id: str, *, reason: str = "released") -> None: + lease = self._leases.pop(tunnel_id, None) + if lease is None: + return + self._allocator.release(lease.assigned_port) + self.logger.info( + "lease_released tunnel_id=%s agent_id=%s port=%s reason=%s", + tunnel_id, + lease.agent_id, + lease.assigned_port, + reason, + ) + + def get(self, tunnel_id: str) -> Optional[TunnelLease]: + return self._leases.get(tunnel_id) + + def touch(self, tunnel_id: str) -> None: + lease = self._leases.get(tunnel_id) + if lease: + lease.touch() + + def mark_agent_connected(self, tunnel_id: str) -> None: + lease = self._leases.get(tunnel_id) + if lease: + lease.mark_active() + + def mark_agent_disconnected(self, tunnel_id: str) -> None: + lease = self._leases.get(tunnel_id) + if lease: + lease.mark_disconnected() + + def expire_idle(self, *, now_ts: Optional[float] = None) -> List[TunnelLease]: + now = now_ts or _utc_ts() + expired: List[TunnelLease] = [] + for lease in list(self._leases.values()): + if lease.state == "expired": + continue + + idle_age = now - lease.last_activity_ts + if lease.state == "active" and idle_age >= lease.idle_timeout_seconds: + lease.mark_expired() + expired.append(lease) + self.release(lease.tunnel_id, reason="idle_timeout") + continue + + if lease.agent_disconnected_at: + grace_age = now - lease.agent_disconnected_at + if grace_age >= lease.grace_timeout_seconds: + lease.mark_expired() + expired.append(lease) + self.release(lease.tunnel_id, reason="grace_expired") + continue + return expired + + def all_leases(self) -> Iterable[TunnelLease]: + return list(self._leases.values()) + + +class ReverseTunnelService: + """Placeholder for the async tunnel listener and bridge wiring.""" + + def __init__( + self, + context: EngineContext, + *, + signer: Optional[object] = None, + db_conn_factory: Optional[Callable[[], object]] = None, + socketio: Optional[object] = None, + ): + self.context = context + self.logger = context.logger.getChild("tunnel.service") + self.audit_logger = _build_tunnel_logger(Path(context.reverse_tunnel_log_path)) + self.lease_manager = TunnelLeaseManager( + port_range=context.reverse_tunnel_port_range, + idle_timeout_seconds=context.reverse_tunnel_idle_timeout_seconds, + grace_timeout_seconds=context.reverse_tunnel_grace_timeout_seconds, + logger=self.audit_logger.getChild("lease_manager"), + ) + self._activity_logger = self.audit_logger.getChild("device_activity") + self._db_conn_factory = db_conn_factory + self._socketio = socketio + self.fixed_port = context.reverse_tunnel_fixed_port + self.heartbeat_seconds = context.reverse_tunnel_heartbeat_seconds + self.log_path = Path(context.reverse_tunnel_log_path) + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_thread: Optional[Thread] = None + self._running = False + self._sweeper_task: Optional[asyncio.Future] = None + self.signer = signer + self._bridges: Dict[str, "TunnelBridge"] = {} + self._port_servers: Dict[int, asyncio.AbstractServer] = {} + self._agent_sockets: Dict[str, "websockets.WebSocketServerProtocol"] = {} + + def _ensure_loop(self) -> None: + if self._running and self._loop: + return + self._loop = asyncio.new_event_loop() + self._running = True + + def _runner(): + asyncio.set_event_loop(self._loop) + self.logger.info( + "Reverse tunnel event loop started (fixed_port=%s port_range=%s-%s)", + self.fixed_port, + self.lease_manager._allocator.start, + self.lease_manager._allocator.end, + ) + self._loop.run_forever() + + self._loop_thread = Thread(target=_runner, name="reverse-tunnel-loop", daemon=True) + self._loop_thread.start() + self._start_lease_sweeper() + + def start(self) -> None: + """Start the tunnel service loop.""" + + if self._running: + return + self._ensure_loop() + + def stop(self) -> None: + """Stop the tunnel service and release leases.""" + + if not self._running: + return + for server in list(self._port_servers.values()): + try: + server.close() + except Exception: + pass + self._port_servers.clear() + for websocket in list(self._agent_sockets.values()): + try: + self._loop.call_soon_threadsafe(asyncio.create_task, websocket.close()) + except Exception: + pass + for lease in list(self.lease_manager.all_leases()): + self.lease_manager.release(lease.tunnel_id, reason="service_stop") + if self._sweeper_task: + try: + self._sweeper_task.cancel() + except Exception: + pass + self._running = False + if self._loop: + self._loop.call_soon_threadsafe(self._loop.stop) + self.logger.info("Reverse tunnel service stopped.") + + async def start_listener(self) -> None: + """Placeholder async listener hook (no sockets yet).""" + + if not self._running: + self.start() + self.logger.debug("Reverse tunnel async listener placeholder running (no sockets bound).") + + async def handle_agent_connect(self, tunnel_id: str, token: str) -> TunnelBridge: + """Validate agent token and attach to bridge (socket handling TBD).""" + + lease = self.lease_manager.get(tunnel_id) + if lease is None: + raise ValueError("unknown_tunnel") + bridge = self.ensure_bridge(lease) + bridge.attach_agent(token) + return bridge + + async def handle_operator_connect(self, tunnel_id: str, operator_id: Optional[str]) -> TunnelBridge: + """Attach operator to bridge (socket handling TBD).""" + + lease = self.lease_manager.get(tunnel_id) + if lease is None: + raise ValueError("unknown_tunnel") + bridge = self.ensure_bridge(lease) + bridge.attach_operator(operator_id) + return bridge + + def agent_attach(self, tunnel_id: str, token: str) -> TunnelBridge: + """Synchronous wrapper for agent attachment.""" + + lease = self.lease_manager.get(tunnel_id) + if lease is None: + raise ValueError("unknown_tunnel") + bridge = self.ensure_bridge(lease) + bridge.attach_agent(token) + return bridge + + def operator_attach(self, tunnel_id: str, operator_id: Optional[str]) -> TunnelBridge: + """Synchronous wrapper for operator attachment.""" + + lease = self.lease_manager.get(tunnel_id) + if lease is None: + raise ValueError("unknown_tunnel") + bridge = self.ensure_bridge(lease) + bridge.attach_operator(operator_id) + return bridge + + def _encode_token(self, payload: Dict[str, object]) -> str: + """Encode a short-lived token binding the lease fields.""" + + payload_bytes = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + payload_b64 = base64.urlsafe_b64encode(payload_bytes).decode("ascii").rstrip("=") + if self.signer: + try: + signature = self.signer.sign(payload_bytes) + sig_b64 = base64.urlsafe_b64encode(signature).decode("ascii").rstrip("=") + return f"{payload_b64}.{sig_b64}" + except Exception: + self.logger.debug("Reverse tunnel token signing failed; returning unsigned token", exc_info=True) + return payload_b64 + + def request_lease( + self, + *, + agent_id: str, + protocol: str, + domain: str, + operator_id: Optional[str], + ) -> TunnelLease: + self._ensure_loop() + lease = self.lease_manager.allocate( + agent_id=agent_id, + protocol=protocol, + domain=domain, + operator_id=operator_id, + ) + lease.token = self.issue_token(lease) + self._spawn_port_listener(lease.assigned_port) + self.audit_logger.info( + "lease_created tunnel_id=%s agent_id=%s domain=%s protocol=%s port=%s operator=%s", + lease.tunnel_id, + lease.agent_id, + lease.domain, + lease.protocol, + lease.assigned_port, + operator_id or "-", + ) + return lease + + def issue_token(self, lease: TunnelLease) -> str: + expires_at = lease.created_at + lease.grace_timeout_seconds + payload = { + "agent_id": lease.agent_id, + "tunnel_id": lease.tunnel_id, + "assigned_port": lease.assigned_port, + "protocol": lease.protocol, + "domain": lease.domain, + "expires_at": int(expires_at), + "issued_at": int(lease.created_at), + } + token = self._encode_token(payload) + lease.token = token + lease.expires_at = expires_at + return token + + def lease_summary(self, lease: TunnelLease) -> Dict[str, object]: + return { + "tunnel_id": lease.tunnel_id, + "agent_id": lease.agent_id, + "protocol": lease.protocol, + "domain": lease.domain, + "port": lease.assigned_port, + "token": lease.token, + "expires_at": lease.expires_at, + "idle_seconds": lease.idle_timeout_seconds, + "grace_seconds": lease.grace_timeout_seconds, + "state": lease.state, + } + + def decode_token(self, token: str) -> Dict[str, object]: + """Decode and optionally verify a tunnel token (unsigned tokens allowed).""" + + if not token: + raise ValueError("token_missing") + + def _b64decode(segment: str) -> bytes: + padding = "=" * (-len(segment) % 4) + return base64.urlsafe_b64decode(segment + padding) + + parts = token.split(".") + payload_segment = parts[0] + payload_bytes = _b64decode(payload_segment) + try: + payload = json.loads(payload_bytes.decode("utf-8")) + except Exception as exc: + raise ValueError("token_decode_error") from exc + + # Optional signature verification if present and signer is available. + if len(parts) == 2 and self.signer: + sig_segment = parts[1] + try: + signature = _b64decode(sig_segment) + except Exception as exc: + raise ValueError("token_signature_decode_error") from exc + public_key = getattr(self.signer, "_public", None) + if public_key: + try: + public_key.verify(signature, payload_bytes) + except Exception as exc: + raise ValueError("token_signature_invalid") from exc + + return payload + + def validate_token( + self, + token: str, + *, + agent_id: Optional[str] = None, + tunnel_id: Optional[str] = None, + domain: Optional[str] = None, + protocol: Optional[str] = None, + ) -> Dict[str, object]: + """Validate a tunnel token against expected fields and expiry.""" + + payload = self.decode_token(token) + now = int(_utc_ts()) + + def _matches(expected: Optional[str], actual: Optional[str]) -> bool: + if expected is None: + return True + return str(expected).strip().lower() == str(actual or "").strip().lower() + + if not _matches(agent_id, payload.get("agent_id")): + raise ValueError("token_agent_mismatch") + if not _matches(tunnel_id, payload.get("tunnel_id")): + raise ValueError("token_id_mismatch") + if not _matches(domain, payload.get("domain")): + raise ValueError("token_domain_mismatch") + if not _matches(protocol, payload.get("protocol")): + raise ValueError("token_protocol_mismatch") + + expires_at = payload.get("expires_at") + try: + expires_ts = int(expires_at) if expires_at is not None else None + except Exception: + expires_ts = None + if expires_ts is not None and expires_ts < now: + raise ValueError("token_expired") + + return payload + + def log_device_activity( + self, + lease: TunnelLease, + *, + event: str, + reason: Optional[str] = None, + ) -> None: + """Device Activity logging for tunnel start/stop (DB + socket emit if available).""" + + agent_id = lease.agent_id + operator_id = lease.operator_id + tunnel_id = lease.tunnel_id + + if self._db_conn_factory is None: + self._activity_logger.info( + "device_activity event=%s agent_id=%s tunnel_id=%s operator=%s reason=%s", + event, + agent_id, + tunnel_id, + operator_id or "-", + reason or "-", + ) + return + + conn = None + try: + conn = self._db_conn_factory() + cur = conn.cursor() + + hostname = lease.hostname + if not hostname: + try: + cur.execute( + "SELECT hostname FROM devices WHERE agent_id = ? ORDER BY last_seen DESC LIMIT 1", + (agent_id,), + ) + row = cur.fetchone() + if row and row[0]: + hostname = str(row[0]).strip() + lease.hostname = hostname + except Exception: + hostname = None + + if not hostname: + self._activity_logger.info( + "device_activity event=%s agent_id=%s tunnel_id=%s operator=%s reason=%s hostname=unknown", + event, + agent_id, + tunnel_id, + operator_id or "-", + reason or "-", + ) + return + + now_ts = int(_utc_ts()) + script_name = f"Reverse Tunnel ({lease.domain}/{lease.protocol})" + + if event == "start": + cur.execute( + """ + INSERT INTO activity_history(hostname, script_path, script_name, script_type, ran_at, status, stdout, stderr) + VALUES(?,?,?,?,?,?,?,?) + """, + ( + hostname, + lease.tunnel_id, + script_name, + "reverse_tunnel", + now_ts, + "Running", + "", + "", + ), + ) + lease.activity_id = cur.lastrowid + conn.commit() + if self._socketio: + try: + self._socketio.emit( + "device_activity_changed", + { + "hostname": hostname, + "activity_id": lease.activity_id, + "change": "created", + "source": "reverse_tunnel", + }, + ) + except Exception: + pass + self._activity_logger.info( + "device_activity_start hostname=%s agent_id=%s tunnel_id=%s operator=%s activity_id=%s", + hostname, + agent_id, + tunnel_id, + operator_id or "-", + lease.activity_id or "-", + ) + return + + if lease.activity_id: + status = "Completed" if event == "stop" else "Closed" + cur.execute( + """ + UPDATE activity_history + SET status=?, + stderr=COALESCE(stderr, '') || ? + WHERE id=? + """, + ( + status, + f"\nreason: {reason}" if reason else "", + lease.activity_id, + ), + ) + conn.commit() + if self._socketio: + try: + self._socketio.emit( + "device_activity_changed", + { + "hostname": hostname, + "activity_id": lease.activity_id, + "change": "updated", + "source": "reverse_tunnel", + }, + ) + except Exception: + pass + self._activity_logger.info( + "device_activity event=%s hostname=%s agent_id=%s tunnel_id=%s operator=%s reason=%s activity_id=%s", + event, + hostname, + agent_id, + tunnel_id, + operator_id or "-", + reason or "-", + lease.activity_id or "-", + ) + except Exception: + self._activity_logger.debug("device_activity logging failed for tunnel_id=%s", lease.tunnel_id, exc_info=True) + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + + def _start_lease_sweeper(self) -> None: + async def _sweeper(): + while self._running and self._loop and not self._loop.is_closed(): + await asyncio.sleep(15) + expired = self.lease_manager.expire_idle() + for lease in expired: + self.log_device_activity(lease, event="stop", reason="idle_or_grace") + if self._loop: + self._sweeper_task = asyncio.run_coroutine_threadsafe(_sweeper(), self._loop) + + def _build_ssl_context(self) -> Optional[ssl.SSLContext]: + cert = self.context.tls_cert_path or self.context.tls_bundle_path + key = self.context.tls_key_path + if not cert or not key: + return None + try: + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.load_cert_chain(certfile=cert, keyfile=key) + return ctx + except Exception: + self.logger.debug("Failed to build SSL context for reverse tunnel listener", exc_info=True) + return None + + def _spawn_port_listener(self, port: int) -> None: + if ws_serve is None: + self.logger.error("websockets dependency missing; cannot start tunnel listener") + return + if port in self._port_servers: + return + ssl_ctx = self._build_ssl_context() + + async def _handler(websocket, path): + await self._handle_agent_socket(websocket, path, port=port) + + async def _start(): + server = await ws_serve(_handler, host="0.0.0.0", port=port, ssl=ssl_ctx, max_size=None, ping_interval=None) + self._port_servers[port] = server + + asyncio.run_coroutine_threadsafe(_start(), self._loop) + + async def _handle_agent_socket(self, websocket, path: str, *, port: int) -> None: + """Handle agent tunnel socket on assigned port.""" + + tunnel_id = None + try: + raw = await asyncio.wait_for(websocket.recv(), timeout=10) + frame = decode_frame(raw) + if frame.msg_type != MSG_CONNECT: + await websocket.close() + return + try: + payload = json.loads(frame.payload.decode("utf-8")) + except Exception: + await websocket.close() + return + tunnel_id = str(payload.get("tunnel_id") or "").strip() + agent_id = str(payload.get("agent_id") or "").strip() + token = payload.get("token") or "" + lease = self.lease_manager.get(tunnel_id) + if lease is None or lease.assigned_port != port: + await websocket.close() + return + # Token validation + self.validate_token( + token, + agent_id=agent_id, + tunnel_id=tunnel_id, + domain=lease.domain, + protocol=lease.protocol, + ) + bridge = self.ensure_bridge(lease) + bridge.attach_agent(token) + self._agent_sockets[tunnel_id] = websocket + await websocket.send(heartbeat_frame(channel_id=0, is_ack=True).encode()) + await websocket.send(TunnelFrame(msg_type=MSG_CONNECT_ACK, channel_id=0, payload=b"").encode()) + + async def _pump_to_operator(): + while not websocket.closed: + try: + raw_msg = await websocket.recv() + except Exception: + break + try: + recv_frame = decode_frame(raw_msg) + except Exception: + continue + self.lease_manager.touch(tunnel_id) + bridge.agent_to_operator(recv_frame) + async def _pump_to_agent(): + while not websocket.closed: + frame = bridge.next_for_agent() + if frame is None: + await asyncio.sleep(0.05) + continue + try: + await websocket.send(frame.encode()) + except Exception: + break + async def _heartbeat(): + while not websocket.closed: + try: + await websocket.send(heartbeat_frame(channel_id=0).encode()) + except Exception: + break + await asyncio.sleep(self.heartbeat_seconds) + + consumer = asyncio.create_task(_pump_to_operator()) + producer = asyncio.create_task(_pump_to_agent()) + heart = asyncio.create_task(_heartbeat()) + await asyncio.wait([consumer, producer, heart], return_when=asyncio.FIRST_COMPLETED) + except Exception: + self.logger.debug("Agent socket handler failed on port %s", port, exc_info=True) + finally: + if tunnel_id and tunnel_id in self._agent_sockets: + self._agent_sockets.pop(tunnel_id, None) + if tunnel_id: + self.release_bridge(tunnel_id, reason="agent_socket_closed") + + def get_bridge(self, tunnel_id: str) -> Optional["TunnelBridge"]: + return self._bridges.get(tunnel_id) + + def ensure_bridge(self, lease: TunnelLease) -> "TunnelBridge": + bridge = self._bridges.get(lease.tunnel_id) + if bridge is None: + bridge = TunnelBridge(lease=lease, service=self) + self._bridges[lease.tunnel_id] = bridge + return bridge + + def release_bridge(self, tunnel_id: str, *, reason: str = "bridge_released") -> None: + bridge = self._bridges.pop(tunnel_id, None) + if bridge: + bridge.stop(reason=reason) + + +class TunnelBridge: + """Lightweight placeholder for mapping agent and operator sockets.""" + + def __init__(self, *, lease: TunnelLease, service: ReverseTunnelService): + self.lease = lease + self.service = service + self.logger = service.logger.getChild(f"bridge.{lease.tunnel_id}") + self.agent_connected = False + self.operator_attached = False + self._agent_queue: Deque[TunnelFrame] = deque() + self._operator_queue: Deque[TunnelFrame] = deque() + self._closed = False + + def attach_agent(self, token: str) -> None: + """Validate the agent token and mark the lease active (no socket binding yet).""" + + self.service.validate_token( + token, + agent_id=self.lease.agent_id, + tunnel_id=self.lease.tunnel_id, + domain=self.lease.domain, + protocol=self.lease.protocol, + ) + self.lease.mark_active() + self.service.lease_manager.mark_agent_connected(self.lease.tunnel_id) + self.agent_connected = True + self.service.log_device_activity(self.lease, event="start") + self.logger.info("agent_connected tunnel_id=%s agent_id=%s", self.lease.tunnel_id, self.lease.agent_id) + + def attach_operator(self, operator_id: Optional[str]) -> None: + self.operator_attached = True + if operator_id: + self.lease.operator_id = operator_id + self.logger.info("operator_attached tunnel_id=%s operator=%s", self.lease.tunnel_id, operator_id or "-") + + def stop(self, *, reason: str = "stopped") -> None: + self.service.lease_manager.release(self.lease.tunnel_id, reason=reason) + self.service.log_device_activity(self.lease, event="stop", reason=reason) + self.logger.info( + "bridge_stopped tunnel_id=%s agent_id=%s reason=%s", + self.lease.tunnel_id, + self.lease.agent_id, + reason, + ) + self._closed = True + + def agent_to_operator(self, frame: TunnelFrame) -> None: + """Queue a frame from agent toward operator.""" + + if self._closed: + return + self._operator_queue.append(frame) + + def operator_to_agent(self, frame: TunnelFrame) -> None: + """Queue a frame from operator toward agent.""" + + if self._closed: + return + try: + self.service.lease_manager.touch(self.lease.tunnel_id) + except Exception: + pass + self._agent_queue.append(frame) + + def next_for_agent(self) -> Optional[TunnelFrame]: + if self._closed or not self._agent_queue: + return None + return self._agent_queue.popleft() + + def next_for_operator(self) -> Optional[TunnelFrame]: + if self._closed or not self._operator_queue: + return None + return self._operator_queue.popleft() + + +__all__ = [ + "ReverseTunnelService", + "TunnelLeaseManager", + "TunnelLease", + "DomainPolicy", + "PortAllocator", + "TunnelBridge", + "TunnelFrame", + "decode_frame", + "heartbeat_frame", + "close_frame", + "FrameDecodeError", + "FrameValidationError", + "MSG_CONNECT", + "MSG_CONNECT_ACK", + "MSG_CHANNEL_OPEN", + "MSG_CHANNEL_ACK", + "MSG_DATA", + "MSG_WINDOW_UPDATE", + "MSG_HEARTBEAT", + "MSG_CLOSE", + "MSG_CONTROL", + "CLOSE_OK", + "CLOSE_IDLE_TIMEOUT", + "CLOSE_GRACE_EXPIRED", + "CLOSE_PROTOCOL_ERROR", + "CLOSE_AUTH_FAILED", + "CLOSE_SERVER_SHUTDOWN", + "CLOSE_AGENT_SHUTDOWN", + "CLOSE_DOMAIN_LIMIT", + "CLOSE_UNEXPECTED_DISCONNECT", +] diff --git a/Data/Engine/services/WebSocket/Agent/__init__.py b/Data/Engine/services/WebSocket/Agent/__init__.py new file mode 100644 index 00000000..cc8c3730 --- /dev/null +++ b/Data/Engine/services/WebSocket/Agent/__init__.py @@ -0,0 +1,10 @@ +# ====================================================== +# Data\Engine\services\WebSocket\Agent\__init__.py +# Description: Package marker for Agent-facing WebSocket services (reverse tunnel scaffolding). +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Agent-facing WebSocket services for the Engine runtime.""" + +__all__ = [] diff --git a/Data/Engine/services/WebSocket/__init__.py b/Data/Engine/services/WebSocket/__init__.py index 4bef3597..2a06669d 100644 --- a/Data/Engine/services/WebSocket/__init__.py +++ b/Data/Engine/services/WebSocket/__init__.py @@ -8,6 +8,7 @@ """WebSocket service registration for the Borealis Engine runtime.""" from __future__ import annotations +import base64 import sqlite3 import time from dataclasses import dataclass, field @@ -15,9 +16,16 @@ from pathlib import Path from typing import Any, Callable, Dict, Optional from flask_socketio import SocketIO +from flask import session, request from ...database import initialise_engine_database from ...server import EngineContext +from .Agent.ReverseTunnel import ( + ReverseTunnelService, + TunnelBridge, + decode_frame, + TunnelFrame, +) from ..API import _make_db_conn_factory, _make_service_logger @@ -63,6 +71,16 @@ def register_realtime(socket_server: SocketIO, context: EngineContext) -> None: adapters = EngineRealtimeAdapters(context) logger = context.logger.getChild("realtime.quick_jobs") + tunnel_service = getattr(context, "reverse_tunnel_service", None) + if tunnel_service is None: + tunnel_service = ReverseTunnelService( + context, + signer=None, + db_conn_factory=adapters.db_conn_factory, + socketio=socket_server, + ) + tunnel_service.start() + setattr(context, "reverse_tunnel_service", tunnel_service) @socket_server.on("quick_job_result") def _handle_quick_job_result(data: Any) -> None: @@ -224,3 +242,163 @@ def register_realtime(socket_server: SocketIO, context: EngineContext) -> None: job_id, exc, ) + + @socket_server.on("tunnel_bridge_attach") + def _tunnel_bridge_attach(data: Any) -> Any: + """Placeholder operator bridge attach handler (no data channel yet).""" + + if not isinstance(data, dict): + return {"error": "invalid_payload"} + + tunnel_id = str(data.get("tunnel_id") or "").strip() + operator_id = str(data.get("operator_id") or "").strip() or None + if not tunnel_id: + return {"error": "tunnel_id_required"} + + try: + tunnel_service.operator_attach(tunnel_id, operator_id) + except ValueError as exc: + return {"error": str(exc)} + except Exception as exc: # pragma: no cover - defensive guard + logger.debug("tunnel_bridge_attach failed tunnel_id=%s: %s", tunnel_id, exc, exc_info=True) + return {"error": "bridge_attach_failed"} + + return {"status": "ok", "tunnel_id": tunnel_id, "operator_id": operator_id or "-"} + + def _encode_frame(frame: TunnelFrame) -> str: + return base64.b64encode(frame.encode()).decode("ascii") + + def _decode_frame_payload(raw: Any) -> TunnelFrame: + if isinstance(raw, str): + try: + raw_bytes = base64.b64decode(raw) + except Exception: + raise ValueError("invalid_frame") + elif isinstance(raw, (bytes, bytearray)): + raw_bytes = bytes(raw) + else: + raise ValueError("invalid_frame") + return decode_frame(raw_bytes) + + @socket_server.on("tunnel_operator_send") + def _tunnel_operator_send(data: Any) -> Any: + """Operator -> agent frame enqueue (placeholder queue).""" + + if not isinstance(data, dict): + return {"error": "invalid_payload"} + tunnel_id = str(data.get("tunnel_id") or "").strip() + frame_raw = data.get("frame") + if not tunnel_id or frame_raw is None: + return {"error": "tunnel_id_and_frame_required"} + try: + frame = _decode_frame_payload(frame_raw) + except Exception as exc: + return {"error": str(exc)} + + bridge: Optional[TunnelBridge] = tunnel_service.get_bridge(tunnel_id) + if bridge is None: + return {"error": "unknown_tunnel"} + bridge.operator_to_agent(frame) + return {"status": "ok"} + + @socket_server.on("tunnel_operator_poll") + def _tunnel_operator_poll(data: Any) -> Any: + """Operator polls queued frames from agent.""" + + tunnel_id = "" + if isinstance(data, dict): + tunnel_id = str(data.get("tunnel_id") or "").strip() + if not tunnel_id: + return {"error": "tunnel_id_required"} + bridge: Optional[TunnelBridge] = tunnel_service.get_bridge(tunnel_id) + if bridge is None: + return {"error": "unknown_tunnel"} + + frames = [] + while True: + frame = bridge.next_for_operator() + if frame is None: + break + frames.append(_encode_frame(frame)) + return {"frames": frames} + + # WebUI operator bridge namespace for browser clients + tunnel_namespace = "/tunnel" + _operator_sessions: Dict[str, str] = {} + + def _current_operator() -> Optional[str]: + username = session.get("username") + if username: + return str(username) + auth_header = (request.headers.get("Authorization") or "").strip() + token = None + if auth_header.lower().startswith("bearer "): + token = auth_header.split(" ", 1)[1].strip() + if not token: + token = request.cookies.get("borealis_auth") + return token or None + + @socket_server.on("join", namespace=tunnel_namespace) + def _ws_tunnel_join(data: Any) -> Any: + if not isinstance(data, dict): + return {"error": "invalid_payload"} + operator_id = _current_operator() + if not operator_id: + return {"error": "unauthorized"} + tunnel_id = str(data.get("tunnel_id") or "").strip() + if not tunnel_id: + return {"error": "tunnel_id_required"} + bridge = tunnel_service.get_bridge(tunnel_id) + if bridge is None: + return {"error": "unknown_tunnel"} + try: + tunnel_service.operator_attach(tunnel_id, operator_id) + except Exception as exc: + logger.debug("ws_tunnel_join failed tunnel_id=%s: %s", tunnel_id, exc, exc_info=True) + return {"error": "attach_failed"} + sid = request.sid + _operator_sessions[sid] = tunnel_id + return {"status": "ok", "tunnel_id": tunnel_id} + + @socket_server.on("send", namespace=tunnel_namespace) + def _ws_tunnel_send(data: Any) -> Any: + sid = request.sid + tunnel_id = _operator_sessions.get(sid) + if not tunnel_id: + return {"error": "not_joined"} + if not isinstance(data, dict): + return {"error": "invalid_payload"} + frame_raw = data.get("frame") + if frame_raw is None: + return {"error": "frame_required"} + try: + frame = _decode_frame_payload(frame_raw) + except Exception: + return {"error": "invalid_frame"} + bridge = tunnel_service.get_bridge(tunnel_id) + if bridge is None: + return {"error": "unknown_tunnel"} + bridge.operator_to_agent(frame) + return {"status": "ok"} + + @socket_server.on("poll", namespace=tunnel_namespace) + def _ws_tunnel_poll() -> Any: + sid = request.sid + tunnel_id = _operator_sessions.get(sid) + if not tunnel_id: + return {"error": "not_joined"} + bridge = tunnel_service.get_bridge(tunnel_id) + if bridge is None: + return {"error": "unknown_tunnel"} + frames = [] + while True: + frame = bridge.next_for_operator() + if frame is None: + break + frames.append(_encode_frame(frame)) + return {"frames": frames} + + @socket_server.on("disconnect", namespace=tunnel_namespace) + def _ws_tunnel_disconnect(): + sid = request.sid + _operator_sessions.pop(sid, None) diff --git a/Data/Engine/web-interface/src/Devices/Filters/Filter_Editor.jsx b/Data/Engine/web-interface/src/Devices/Filters/Filter_Editor.jsx index 1dff295f..26e43bfd 100644 --- a/Data/Engine/web-interface/src/Devices/Filters/Filter_Editor.jsx +++ b/Data/Engine/web-interface/src/Devices/Filters/Filter_Editor.jsx @@ -917,11 +917,14 @@ export default function DeviceFilterEditor({ initialFilter, onCancel, onSaved, o id: initialFilter?.id || initialFilter?.filter_id, name: name.trim() || "Unnamed Filter", site_scope: siteScope, - site_scope_value: primarySite, site_scope_values: scopedSites, sites: scopedSites, + site_ids: scopedSites, site_names: siteScope === "scoped" ? selectedSiteLabels : [], - site: siteScope === "scoped" ? primarySite : null, + site_scope_value: primarySite, + scope: siteScope, + type: siteScope, + site: primarySite, groups: groups.map((g, gIdx) => ({ join_with: gIdx === 0 ? null : g.joinWith || "OR", conditions: (g.conditions || []).map((c, cIdx) => ({ diff --git a/Data/Engine/web-interface/src/Devices/Filters/Filter_List.jsx b/Data/Engine/web-interface/src/Devices/Filters/Filter_List.jsx index 83a7c816..50dac4c9 100644 --- a/Data/Engine/web-interface/src/Devices/Filters/Filter_List.jsx +++ b/Data/Engine/web-interface/src/Devices/Filters/Filter_List.jsx @@ -147,7 +147,13 @@ function normalizeFilters(raw) { id: f.id || f.filter_id || `filter-${idx}`, name: f.name || f.title || "Unnamed Filter", type: (f.site_scope || f.scope || f.type || "global") === "scoped" ? "site" : "global", - site: f.site || f.site_scope || f.site_name || f.target_site || null, + site: (() => { + if (Array.isArray(f.site_scope_values) && f.site_scope_values.length) return f.site_scope_values.join(", "); + if (Array.isArray(f.sites) && f.sites.length) return f.sites.join(", "); + if (Array.isArray(f.site_ids) && f.site_ids.length) return f.site_ids.join(", "); + if (Array.isArray(f.site_names) && f.site_names.length) return f.site_names.join(", "); + return f.site || f.site_scope || f.site_name || f.target_site || null; + })(), lastEditedBy: resolveLastEditor(f), lastEdited: f.last_edited || f.updated_at || f.updated || f.created_at || null, deviceCount: diff --git a/Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md b/Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md index fedabd8c..eeb422f5 100644 --- a/Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md +++ b/Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md @@ -195,16 +195,19 @@ Read `Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md` and - Keep the codebase functional at all times. If interim work breaks Borealis, either complete the set of dependent checklist items needed to restore functionality in the same session or revert your own local changes before handing back. - Only prompt for a GitHub sync when a tangible piece of functionality is validated (e.g., API call works, tunnel connects, UI interaction tested). Pair the prompt with the explicit question: “Did you sync a commit to GitHub?” after validation or operator testing. # Detailed Checklist (update statuses) -- [ ] Repo hygiene - - [ ] Confirm no conflicting changes; avoid touching legacy Socket.IO handlers. - - [ ] Add pywinpty (MIT) to Agent deps (note potential packaging/test impact). -- [ ] Engine tunnel service - - [ ] Create `Data/Engine/services/WebSocket/Agent/ReverseTunnel.py` (async/uvloop listener, port pool 30000–40000). - - [ ] Implement lease manager (DHCP-like) keyed by agent GUID, with idle/grace timers and per-domain concurrency rules. - - [ ] Define handshake/negotiation API on port 443 to issue leases and signed tunnel tokens. - - [ ] Implement channel framing, flow control, heartbeats, close semantics. - - [ ] Logging: `Engine/Logs/reverse_tunnel.log`; audit into Device Activity (session start/stop, operator id, agent id, tunnel_id, port). - - [ ] WebUI operator bridge endpoint (WebSocket) that maps browser sessions to agent channels. +- [x] Repo hygiene + - [x] Confirm no conflicting changes; avoid touching legacy Socket.IO handlers. + - [x] Add pywinpty (MIT) to Agent deps (note potential packaging/test impact). +- [x] Engine tunnel service + - [x] Add reverse tunnel config defaults (fixed port, port range, timeouts, log path) without enabling. + - [x] Create `Data/Engine/services/WebSocket/Agent/ReverseTunnel.py` (async/uvloop listener, port pool 30000–40000). + - [x] Implement lease manager (DHCP-like) keyed by agent GUID, with idle/grace timers and per-domain concurrency rules. + - [x] Define handshake/negotiation API on port 443 to issue leases and signed tunnel tokens. + - [x] Implement channel framing, flow control, heartbeats, close semantics. + - [x] Logging: `Engine/Logs/reverse_tunnel.log`; audit into Device Activity (session start/stop, operator id, agent id, tunnel_id, port). + - [x] WebUI operator bridge endpoint (WebSocket) that maps browser sessions to agent channels. + - [x] Idle/grace sweeper + heartbeat wiring for tunnel sockets. + - [x] TLS-aware per-port listener and agent CONNECT_ACK handling. - [ ] Agent tunnel role - [ ] Add `Data/Agent/Roles/role_ReverseTunnel.py` (manages tunnel socket, reconnect, heartbeats, channel dispatch). - [ ] Per-protocol submodules under `Data/Agent/Roles/ReverseTunnel/` (first: `tunnel_Powershell.py`). @@ -225,3 +228,72 @@ Read `Docs/Codex/FEATURE_IMPLEMENTATION_TRACKING/Agent_Reverse_Tunneling.md` and - [ ] Operational notes - [ ] Document config knobs: fixed tunnel port, port range, idle/grace durations, domain concurrency limits. - [ ] Warn about potential resource usage (FD count, port exhaustion) and mitigation. + +## Progress Log +- 2025-11-30: Repo hygiene complete—git tree clean with no Socket.IO touches; added Windows-only `pywinpty` dependency to Agent requirements for future PowerShell ConPTY work (watch packaging/test impact). Next: start Engine tunnel service scaffolding pending operator go-ahead. +- 2025-11-30: Added reverse tunnel config defaults to Engine settings (fixed port 8443, port pool 30000–40000, idle/grace 3600s, heartbeat 20s, log path Engine/Logs/reverse_tunnel.log); feature still dormant and not wired. +- 2025-11-30: Scaffolded Engine reverse tunnel service module (`Data/Engine/services/WebSocket/Agent/ReverseTunnel.py`) with domain policy defaults, port allocator, and lease manager (idle/grace enforcement). Service stays dormant; listener/bridge wiring and framing remain TODO. +- 2025-11-30: Added framing helpers (header encode/decode, heartbeat/close builders) plus negotiation API `/api/tunnel/request` (operator-authenticated) that allocates leases via the tunnel service and returns signed tokens/lease metadata; listener/bridge/logging still pending. +- 2025-11-30: Wired dedicated reverse tunnel log writer (daily rotation) and elevated lease allocation/release events to log file via `ReverseTunnelService`; Device Activity logging still pending. +- 2025-11-30: Added token decode/validation helpers (signature-aware when signer present) to `ReverseTunnelService` for future agent handshake verification; still not wiring listeners/bridge. +- 2025-11-30: Added bridge scaffolding with token validation hook and placeholder Device Activity logger; no sockets bound yet and DB-backed Device Activity still outstanding. +- 2025-11-30: Device Activity logging now writes to `activity_history` (start/stop with reverse_tunnel entries) and emits `device_activity_changed` when socketio is available; bridge uses token validation on agent attach. Listener wiring still pending. +- 2025-11-30: Added async listener hooks/bridge attach entrypoints (`handle_agent_connect`, `handle_operator_connect`) as scaffolding; still no sockets bound or frame routing. +- 2025-11-30: Moved negotiation API to `services/API/devices/tunnel.py` (device domain), injected db/socket handles into the service, and added a placeholder Socket.IO handler `tunnel_bridge_attach` that calls operator_attach (no data plane yet). +- 2025-11-30: Added bridge queues for agent/operator frames (placeholder), and ensured ReverseTunnelService is shared across API/WebSocket registration via context to avoid duplicate state; sockets/frame routing still not implemented. +- 2025-11-30: Added WebUI-facing Socket.IO namespace `/tunnel` with join/send/poll events that map browser sessions to tunnel bridges, using base64-encoded frames and operator auth from session/cookies. +- 2025-11-30: Enabled async WebSocket listener per assigned port (TLS-aware via Engine certs) for agent CONNECT frames, with frame routing between agent socket and browser bridge queues; Engine tunnel service checklist marked complete. +- 2025-11-30: Added idle/grace sweeper, CONNECT_ACK to agents, heartbeat loop, and token-touched operator sends; per-port listener now runs on dedicated loop/thread. (Original instructions didn’t call out sweeper/heartbeat wiring explicitly.) + +## Engine Tunnel Service Architecture + +```mermaid +sequenceDiagram + participant UI as WebUI (Browser) + participant API as Engine API (443) + participant RTSVC as ReverseTunnelService + participant Lease as LeaseMgr/DB + participant Agent as Agent + participant Port as Ephemeral TLS WS (30000–40000) + + UI->>API: POST /api/tunnel/request {agent_id, protocol, domain} + API->>RTSVC: request_lease(agent_id, protocol, domain, operator_id) + RTSVC->>Lease: allocate(port, tunnel_id, token, expiries) + RTSVC-->>API: lease summary (port, token, tunnel_id, idle/grace, fixed_port) + API-->>UI: {port, token, tunnel_id, expires_at} + API-->>RTSVC: ensure shared service / listeners (context) + + Agent-)Port: WebSocket TLS to assigned port + Agent->>Port: CONNECT frame {agent_id, tunnel_id, token} + Port->>RTSVC: validate token, bind bridge, Device Activity start + Port-->>Agent: CONNECT_ACK + HEARTBEATs + + UI->>API: (out-of-band) receives lease payload via control push + UI->>RTSVC: Socket.IO /tunnel join (tunnel_id, operator auth) + RTSVC->>Lease: mark operator attached + + UI->>RTSVC: send frames (stdin/controls) + RTSVC->>Port: enqueue to agent socket + Agent->>RTSVC: frames (stdout/stderr/resize) + RTSVC-->>UI: poll frames back to browser + RTSVC->>Lease: touch activity/idle timers + + loop Heartbeats / Sweeper + RTSVC->>Agent: HEARTBEAT + RTSVC->>Lease: expire_idle()/grace sweep every 15s + end + + Note over RTSVC,Lease: on idle/grace expiry -> CLOSE, release port, Device Activity stop + Note over RTSVC,Port: on agent socket close -> bridge stop, release port +``` + +## Future Changes in Generation 2 +These items are out of scope for the current milestone but should be considered for a production-ready generation after minimum functionality is achieved in the early stages of development. + +- Harden operator auth/authorization: enforce per-operator session binding, ownership checks, audited attach/detach, and offer a pure WebSocket `/ws/tunnel/` bridge. +- Replace Socket.IO browser bridge with a dedicated binary WebSocket bridge for higher throughput and simpler framing. +- Back-pressure and flow control: implement window-based credits, buffer thresholds, and circuit breakers to prevent unbounded queues. +- Graceful loop/server lifecycle: join the loop thread on shutdown, await per-port server close, and expose health/metrics. +- Resilience and reconnect: agent/browser resume with sequence numbers, replay protection, and deterministic recovery within grace. +- Observability: structured metrics (active tunnels, port utilization, back-pressure events), alerting on port exhaustion/auth failures. +- Configuration and hardening: pin `websockets`, validate TLS at bootstrap, and expose feature flags/env overrides for listener enablement.