From 737bf1faef4deedc1e154745595d6daffc8ab6b4 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sat, 6 Dec 2025 04:58:10 -0700 Subject: [PATCH] Fixed Tunnel Collapse Issues Every 45s --- .../Roles/ReverseTunnel/tunnel_Powershell.py | 6 +- Data/Agent/Roles/role_ReverseTunnel.py | 93 ++++++++++++++++++- Data/Agent/agent.py | 3 +- .../services/WebSocket/Agent/ReverseTunnel.py | 37 +++++++- .../src/Devices/ReverseTunnel/Powershell.jsx | 7 +- 5 files changed, 133 insertions(+), 13 deletions(-) diff --git a/Data/Agent/Roles/ReverseTunnel/tunnel_Powershell.py b/Data/Agent/Roles/ReverseTunnel/tunnel_Powershell.py index db516fb1..759b7ff2 100644 --- a/Data/Agent/Roles/ReverseTunnel/tunnel_Powershell.py +++ b/Data/Agent/Roles/ReverseTunnel/tunnel_Powershell.py @@ -180,7 +180,11 @@ class PowershellChannel: # Include exit code in the close reason for debugging. exit_suffix = f" (exit={self._exit_code})" if self._exit_code is not None else "" close_reason = (reason or "powershell_exit") + exit_suffix - await self._send_close(code, close_reason) + # Always send CLOSE before socket teardown so engine/UI see the reason. + try: + await self._send_close(code, close_reason) + except Exception: + self.role._log("reverse_tunnel ps close send failed", error=True) self.role._log( f"reverse_tunnel ps channel stopped channel={self.channel_id} reason={close_reason}" ) diff --git a/Data/Agent/Roles/role_ReverseTunnel.py b/Data/Agent/Roles/role_ReverseTunnel.py index 7ca61a20..8ab4d9e0 100644 --- a/Data/Agent/Roles/role_ReverseTunnel.py +++ b/Data/Agent/Roles/role_ReverseTunnel.py @@ -150,6 +150,7 @@ class ActiveTunnel: connected: bool = False stopping: bool = False stop_reason: Optional[str] = None + stop_origin: Optional[str] = None class BaseChannel: @@ -516,7 +517,40 @@ class Role: heartbeats = self.loop.create_task(self._heartbeat_loop(tunnel)) watchdog = self.loop.create_task(self._watchdog(tunnel)) tunnel.tasks.extend([sender, receiver, heartbeats, watchdog]) - await asyncio.wait([sender, receiver, heartbeats, watchdog], return_when=asyncio.FIRST_COMPLETED) + task_labels = { + sender: "sender", + receiver: "receiver", + heartbeats: "heartbeat", + watchdog: "watchdog", + } + done, pending = await asyncio.wait(task_labels.keys(), return_when=asyncio.FIRST_COMPLETED) + for finished in done: + label = task_labels.get(finished) or "unknown" + exc_text = "" + try: + exc_obj = finished.exception() + except asyncio.CancelledError: + exc_obj = None + exc_text = " (cancelled)" + except Exception as exc: # pragma: no cover - defensive logging + exc_obj = exc + if exc_obj: + exc_text = f" (exc={exc_obj!r})" + if not tunnel.stop_reason: + tunnel.stop_reason = f"{label}_stopped{exc_text}" + if not tunnel.stop_origin: + tunnel.stop_origin = label + self._log( + f"reverse_tunnel task completed tunnel_id={tunnel.tunnel_id} task={label} stop_reason={tunnel.stop_reason}{exc_text}" + ) + if pending: + try: + self._log( + "reverse_tunnel pending tasks after first completion tunnel_id=%s pending=%s", + # Represent pending tasks by label for debugging. + ) + except Exception: + pass except Exception as exc: self._log(f"reverse_tunnel connection failed tunnel_id={tunnel.tunnel_id}: {exc}", error=True) await self._emit_status({"tunnel_id": tunnel.tunnel_id, "agent_id": self.ctx.agent_id, "status": "error", "reason": "connect_failed"}) @@ -543,10 +577,14 @@ class Role: f"msg_type={frame.msg_type} channel={frame.channel_id} len={len(frame.payload or b'')}" ) except Exception: + if not tunnel.stop_reason: + tunnel.stop_reason = "sender_error" break except asyncio.CancelledError: pass except Exception: + if not tunnel.stop_reason: + tunnel.stop_reason = "sender_failed" self._log(f"reverse_tunnel sender failed tunnel_id={tunnel.tunnel_id}", error=True) finally: self._log(f"reverse_tunnel sender stopped tunnel_id={tunnel.tunnel_id}") @@ -575,9 +613,18 @@ class Role: except asyncio.CancelledError: pass except Exception: + if not tunnel.stop_reason: + tunnel.stop_reason = "receiver_failed" self._log(f"reverse_tunnel receiver failed tunnel_id={tunnel.tunnel_id}", error=True) finally: self._log(f"reverse_tunnel receiver stopped tunnel_id={tunnel.tunnel_id}") + # If no stop_reason was set, emit a CLOSE so engine/UI see a reason. + if not tunnel.stop_reason: + try: + await self._send_frame(tunnel, close_frame(0, CLOSE_UNEXPECTED_DISCONNECT, "receiver_stop")) + tunnel.stop_reason = "receiver_stop" + except Exception: + pass async def _heartbeat_loop(self, tunnel: ActiveTunnel) -> None: try: @@ -588,6 +635,8 @@ class Role: except asyncio.CancelledError: pass except Exception: + if not tunnel.stop_reason: + tunnel.stop_reason = "heartbeat_failed" self._log(f"reverse_tunnel heartbeat failed tunnel_id={tunnel.tunnel_id}", error=True) finally: self._log(f"reverse_tunnel heartbeat loop stopped tunnel_id={tunnel.tunnel_id}") @@ -599,15 +648,19 @@ class Role: now = time.time() if tunnel.idle_seconds and (now - tunnel.last_activity) >= tunnel.idle_seconds: await self._send_frame(tunnel, close_frame(0, CLOSE_IDLE_TIMEOUT, "idle_timeout")) + tunnel.stop_reason = tunnel.stop_reason or "idle_timeout" self._log(f"reverse_tunnel watchdog idle_timeout tunnel_id={tunnel.tunnel_id}") break if tunnel.expires_at and (now - tunnel.expires_at) >= tunnel.grace_seconds: await self._send_frame(tunnel, close_frame(0, CLOSE_GRACE_EXPIRED, "grace_expired")) + tunnel.stop_reason = tunnel.stop_reason or "grace_expired" self._log(f"reverse_tunnel watchdog grace_expired tunnel_id={tunnel.tunnel_id}") break except asyncio.CancelledError: pass except Exception: + if not tunnel.stop_reason: + tunnel.stop_reason = "watchdog_failed" self._log(f"reverse_tunnel watchdog failed tunnel_id={tunnel.tunnel_id}", error=True) finally: self._log(f"reverse_tunnel watchdog stopped tunnel_id={tunnel.tunnel_id}") @@ -692,7 +745,7 @@ class Role: ) async def _send_frame(self, tunnel: ActiveTunnel, frame: TunnelFrame) -> None: - if tunnel.stopping: + if tunnel.stopping and getattr(frame, "msg_type", None) != MSG_CLOSE: return try: tunnel.send_queue.put_nowait(frame) @@ -703,6 +756,11 @@ class Role: tunnel = self._active.get(tunnel_id) if not tunnel: return + if not tunnel.stop_origin: + tunnel.stop_origin = "stop_tunnel" + self._log(f"reverse_tunnel stop_tunnel requested tunnel_id={tunnel_id} code={code} reason={reason}") + if not tunnel.stop_reason: + tunnel.stop_reason = reason or "requested" await self._send_frame(tunnel, close_frame(0, code, reason)) await self._shutdown_tunnel(tunnel, send_close=False) @@ -710,9 +768,33 @@ class Role: if tunnel.stopping: return tunnel.stopping = True - if send_close: + reason_text = tunnel.stop_reason or "closed" + if not tunnel.stop_reason: + tunnel.stop_reason = reason_text + if not tunnel.stop_origin: + tunnel.stop_origin = "shutdown" + self._log( + f"reverse_tunnel shutdown start tunnel_id={tunnel.tunnel_id} stop_reason={tunnel.stop_reason} " + f"stop_origin={tunnel.stop_origin} ws_closed={getattr(tunnel.websocket, 'closed', None)}" + ) + # Stop all channels first so CLOSE frames (with reasons) are sent upstream. + for handler in list(tunnel.channels.values()): try: - await self._send_frame(tunnel, close_frame(0, CLOSE_AGENT_SHUTDOWN, "agent_shutdown")) + await handler.stop(code=CLOSE_UNEXPECTED_DISCONNECT, reason=reason_text or "tunnel_shutdown") + except Exception: + pass + if send_close: + close_payload = close_frame(0, CLOSE_AGENT_SHUTDOWN, reason_text or "agent_shutdown") + try: + await self._send_frame(tunnel, close_payload) + # Give the sender loop a brief window to flush the CLOSE upstream. + await asyncio.sleep(0.05) + except Exception: + pass + # Fallback: if sender task died, try sending directly on the websocket. + try: + if tunnel.websocket and not tunnel.websocket.closed: + await tunnel.websocket.send_bytes(close_payload.encode()) except Exception: pass for task in list(tunnel.tasks): @@ -722,7 +804,8 @@ class Role: pass if tunnel.websocket is not None: try: - await tunnel.websocket.close() + message = (reason_text or "agent_shutdown").encode("utf-8", "ignore")[:120] + await tunnel.websocket.close(message=message) except Exception: pass if tunnel.session is not None: diff --git a/Data/Agent/agent.py b/Data/Agent/agent.py index 5c463e4e..2dc60232 100644 --- a/Data/Agent/agent.py +++ b/Data/Agent/agent.py @@ -2957,7 +2957,8 @@ async def connect_error(data): @sio.event async def disconnect(): print("[WebSocket] Disconnected from Borealis server.") - await stop_all_roles() + # Do not tear down roles/tunnels on control-plane drop; idle/grace timers inside roles handle cleanup. + _log_agent('SocketIO disconnect observed; leaving roles/tunnels running to survive transient drops.', fname='agent.log') CONFIG.data['regions'].clear() CONFIG._write() diff --git a/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py b/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py index 602799ee..221dacd5 100644 --- a/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py +++ b/Data/Engine/services/WebSocket/Agent/ReverseTunnel.py @@ -947,6 +947,7 @@ class ReverseTunnelService: """Handle agent tunnel socket on assigned port.""" tunnel_id = None + tunnel_stop_reason: Optional[str] = None sock_log = self.audit_logger.getChild("agent_socket") try: peer = None @@ -1021,6 +1022,7 @@ class ReverseTunnelService: ) async def _pump_to_operator(): + nonlocal tunnel_stop_reason sock_log_local = sock_log.getChild("recv") while not websocket.closed: try: @@ -1040,6 +1042,28 @@ class ReverseTunnelService: recv_frame.channel_id, len(recv_frame.payload or b""), ) + if recv_frame.msg_type == MSG_CLOSE and recv_frame.channel_id == 0: + try: + close_info = json.loads(recv_frame.payload.decode("utf-8")) + except Exception: + close_info = {} + close_code = close_info.get("code") if isinstance(close_info, dict) else None + close_reason = close_info.get("reason") if isinstance(close_info, dict) else None + tunnel_stop_reason = (close_reason or "").strip() or ( + f"agent_close_code_{close_code}" if close_code is not None else "agent_close" + ) + sock_log_local.info( + "agent_close_frame tunnel_id=%s code=%s reason=%s", + tunnel_id, + close_code, + tunnel_stop_reason or "-", + ) + try: + self.lease_manager.mark_agent_disconnected(tunnel_id) + except Exception: + pass + bridge.agent_to_operator(recv_frame) + break try: self._dispatch_agent_frame(tunnel_id, recv_frame) except Exception: @@ -1080,20 +1104,27 @@ class ReverseTunnelService: except Exception: sock_log.info("agent_socket_handler_failed port=%s tunnel_id=%s", port, tunnel_id, exc_info=True) finally: + ws_close_reason = getattr(websocket, "close_reason", None) + ws_close_code = getattr(websocket, "close_code", None) + close_reason = tunnel_stop_reason or (ws_close_reason if ws_close_reason else None) try: sock_log.info( "agent_socket_closed port=%s tunnel_id=%s code=%s reason=%s", port, tunnel_id, - getattr(websocket, "close_code", None), - getattr(websocket, "close_reason", None), + ws_close_code, + close_reason, ) except Exception: pass if tunnel_id and tunnel_id in self._agent_sockets: self._agent_sockets.pop(tunnel_id, None) if tunnel_id: - self.release_bridge(tunnel_id, reason="agent_socket_closed") + try: + self.lease_manager.mark_agent_disconnected(tunnel_id) + except Exception: + pass + self.release_bridge(tunnel_id, reason=close_reason or "agent_socket_closed") def get_bridge(self, tunnel_id: str) -> Optional["TunnelBridge"]: return self._bridges.get(tunnel_id) diff --git a/Data/Engine/web-interface/src/Devices/ReverseTunnel/Powershell.jsx b/Data/Engine/web-interface/src/Devices/ReverseTunnel/Powershell.jsx index 97f455ec..06e59cff 100644 --- a/Data/Engine/web-interface/src/Devices/ReverseTunnel/Powershell.jsx +++ b/Data/Engine/web-interface/src/Devices/ReverseTunnel/Powershell.jsx @@ -448,17 +448,18 @@ export default function ReverseTunnelPowershell({ device }) { ); const isConnected = sessionState === "connected" || psStatus?.ack; + const isClosed = sessionState === "closed" || psStatus?.closed; const isBusy = sessionState === "requesting" || sessionState === "waiting" || sessionState === "waiting_agent" || sessionState === "lease_issued"; - const canStart = Boolean(agentId) && !isBusy && !isConnected; + const canStart = Boolean(agentId) && !isBusy; const sessionChips = [ { - label: isConnected ? "Connected" : sessionState === "idle" ? "Idle" : sessionState.replace(/_/g, " "), - color: isConnected ? MAGIC_UI.accentC : MAGIC_UI.accentA, + label: isConnected ? "Connected" : isClosed ? "Session ended" : sessionState === "idle" ? "Idle" : sessionState.replace(/_/g, " "), + color: isConnected ? MAGIC_UI.accentC : isClosed ? MAGIC_UI.accentB : MAGIC_UI.accentA, icon: , }, tunnel?.tunnel_id