mirror of
https://github.com/bunny-lab-io/Borealis.git
synced 2025-10-26 15:21:57 -06:00
Expand agent/server websocket diagnostics
This commit is contained in:
@@ -154,8 +154,29 @@ def _describe_exception(exc: BaseException) -> str:
|
||||
parts.append(f"args={args!r}")
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
details = getattr(exc, "__dict__", None)
|
||||
if isinstance(details, dict):
|
||||
# Capture noteworthy nested attributes such as os_error/errno to help diagnose
|
||||
# connection failures that collapse into generic ConnectionError wrappers.
|
||||
for key in ("os_error", "errno", "code", "status"):
|
||||
if key in details and details[key]:
|
||||
parts.append(f"{key}={details[key]!r}")
|
||||
except Exception:
|
||||
pass
|
||||
return "; ".join(part for part in parts if part)
|
||||
|
||||
|
||||
def _log_exception_trace(prefix: str) -> None:
|
||||
try:
|
||||
tb = traceback.format_exc()
|
||||
if not tb:
|
||||
return
|
||||
for line in tb.rstrip().splitlines():
|
||||
_log_agent(f"{prefix} trace: {line}", fname="agent.error.log")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Headless/service mode flag (skip Qt and interactive UI)
|
||||
SYSTEM_SERVICE_MODE = ('--system-service' in sys.argv) or (os.environ.get('BOREALIS_AGENT_MODE') == 'system')
|
||||
SERVICE_MODE = 'system' if SYSTEM_SERVICE_MODE else 'currentuser'
|
||||
@@ -464,6 +485,31 @@ def _mask_sensitive(value: str, *, prefix: int = 4, suffix: int = 4) -> str:
|
||||
return '***'
|
||||
|
||||
|
||||
def _format_debug_pairs(pairs: Dict[str, Any]) -> str:
|
||||
try:
|
||||
parts = []
|
||||
for key, value in pairs.items():
|
||||
parts.append(f"{key}={value!r}")
|
||||
return ", ".join(parts)
|
||||
except Exception:
|
||||
return repr(pairs)
|
||||
|
||||
|
||||
def _summarize_headers(headers: Dict[str, str]) -> str:
|
||||
try:
|
||||
rendered: List[str] = []
|
||||
for key, value in headers.items():
|
||||
lowered = key.lower()
|
||||
display = value
|
||||
if lowered == 'authorization':
|
||||
token = value.split()[-1] if value and ' ' in value else value
|
||||
display = f"Bearer {_mask_sensitive(token)}"
|
||||
rendered.append(f"{key}={display}")
|
||||
return ", ".join(rendered)
|
||||
except Exception:
|
||||
return '<unavailable>'
|
||||
|
||||
|
||||
def _decode_base64_text(value):
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
@@ -846,10 +892,25 @@ class AgentHttpClient:
|
||||
verify = getattr(self.session, "verify", True)
|
||||
engine = getattr(client, "eio", None)
|
||||
if engine is None:
|
||||
_log_agent(
|
||||
"SocketIO TLS alignment skipped; AsyncClient.eio missing",
|
||||
fname="agent.error.log",
|
||||
)
|
||||
return
|
||||
|
||||
http_iface = getattr(engine, "http", None)
|
||||
|
||||
debug_info = {
|
||||
"verify_type": type(verify).__name__,
|
||||
"verify_value": verify,
|
||||
"engine_type": type(engine).__name__,
|
||||
"http_iface_present": http_iface is not None,
|
||||
}
|
||||
_log_agent(
|
||||
f"SocketIO TLS alignment start: {_format_debug_pairs(debug_info)}",
|
||||
fname="agent.log",
|
||||
)
|
||||
|
||||
def _set_attr(target: Any, name: str, value: Any) -> None:
|
||||
if target is None:
|
||||
return
|
||||
@@ -872,8 +933,16 @@ class AgentHttpClient:
|
||||
try:
|
||||
context = ssl.create_default_context(cafile=verify)
|
||||
context.check_hostname = False
|
||||
_log_agent(
|
||||
f"SocketIO TLS alignment created SSLContext from cafile={verify}",
|
||||
fname="agent.log",
|
||||
)
|
||||
except Exception:
|
||||
context = None
|
||||
_log_agent(
|
||||
f"SocketIO TLS alignment failed to build context from cafile={verify}",
|
||||
fname="agent.error.log",
|
||||
)
|
||||
|
||||
if context is not None:
|
||||
_set_attr(engine, "ssl_context", context)
|
||||
@@ -883,6 +952,10 @@ class AgentHttpClient:
|
||||
_set_attr(http_iface, "ssl_verify", True)
|
||||
_set_attr(http_iface, "verify_ssl", True)
|
||||
_reset_cached_session()
|
||||
_log_agent(
|
||||
"SocketIO TLS alignment applied dedicated SSLContext to engine/http",
|
||||
fname="agent.log",
|
||||
)
|
||||
return
|
||||
|
||||
# Fall back to boolean verification flags when we either do not
|
||||
@@ -896,8 +969,16 @@ class AgentHttpClient:
|
||||
_set_attr(http_iface, "ssl_verify", verify_flag)
|
||||
_set_attr(http_iface, "verify_ssl", verify_flag)
|
||||
_reset_cached_session()
|
||||
_log_agent(
|
||||
f"SocketIO TLS alignment fallback verify_flag={verify_flag}",
|
||||
fname="agent.log",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
_log_agent(
|
||||
"SocketIO TLS alignment encountered unexpected error",
|
||||
fname="agent.error.log",
|
||||
)
|
||||
_log_exception_trace("configure_socketio")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Enrollment & token management
|
||||
@@ -2355,6 +2436,15 @@ async def send_agent_details_once():
|
||||
async def connect():
|
||||
print(f"[INFO] Successfully Connected to Borealis Server!")
|
||||
_log_agent('Connected to server.')
|
||||
try:
|
||||
sid = getattr(sio, 'sid', None)
|
||||
transport = getattr(sio, 'transport', None)
|
||||
_log_agent(
|
||||
f'WebSocket handshake established sid={sid!r} transport={transport!r}',
|
||||
fname='agent.log',
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
await sio.emit('connect_agent', {"agent_id": AGENT_ID, "service_mode": SERVICE_MODE})
|
||||
|
||||
# Send an immediate heartbeat via authenticated REST call.
|
||||
@@ -2649,21 +2739,47 @@ if not SYSTEM_SERVICE_MODE:
|
||||
async def connect_loop():
|
||||
retry = 5
|
||||
client = http_client()
|
||||
attempt = 0
|
||||
while True:
|
||||
attempt += 1
|
||||
try:
|
||||
_log_agent(
|
||||
f'connect_loop attempt={attempt} starting authentication phase',
|
||||
fname='agent.log',
|
||||
)
|
||||
client.ensure_authenticated()
|
||||
auth_snapshot = {
|
||||
'guid_present': bool(client.guid),
|
||||
'access_token': bool(client.access_token),
|
||||
'refresh_token': bool(client.refresh_token),
|
||||
'access_expiry': client.access_expires_at,
|
||||
}
|
||||
_log_agent(
|
||||
f"connect_loop attempt={attempt} auth snapshot: {_format_debug_pairs(auth_snapshot)}",
|
||||
fname='agent.log',
|
||||
)
|
||||
client.configure_socketio(sio)
|
||||
try:
|
||||
setattr(sio, "connection_error", None)
|
||||
except Exception:
|
||||
pass
|
||||
url = client.websocket_base_url()
|
||||
headers = client.auth_headers()
|
||||
header_summary = _summarize_headers(headers)
|
||||
verify_value = getattr(client.session, 'verify', None)
|
||||
_log_agent(
|
||||
f"connect_loop attempt={attempt} dialing websocket url={url} transports=['websocket'] verify={verify_value!r} headers={header_summary}",
|
||||
fname='agent.log',
|
||||
)
|
||||
print(f"[INFO] Connecting Agent to {url}...")
|
||||
_log_agent(f'Connecting to {url}...')
|
||||
await sio.connect(
|
||||
url,
|
||||
transports=['websocket'],
|
||||
headers=client.auth_headers(),
|
||||
headers=headers,
|
||||
)
|
||||
_log_agent(
|
||||
f'connect_loop attempt={attempt} sio.connect completed successfully',
|
||||
fname='agent.log',
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
@@ -2674,8 +2790,13 @@ async def connect_loop():
|
||||
conn_err = None
|
||||
if conn_err:
|
||||
detail = f"{detail}; connection_error={conn_err!r}"
|
||||
print(f"[WebSocket] Server unavailable: {detail}. Retrying in {retry}s...")
|
||||
_log_agent(f'Server unavailable: {detail}', fname='agent.error.log')
|
||||
message = (
|
||||
f"connect_loop attempt={attempt} server unavailable: {detail}. "
|
||||
f"Retrying in {retry}s..."
|
||||
)
|
||||
print(f"[WebSocket] {message}")
|
||||
_log_agent(message, fname='agent.error.log')
|
||||
_log_exception_trace(f'connect_loop attempt={attempt}')
|
||||
await asyncio.sleep(retry)
|
||||
|
||||
if __name__=='__main__':
|
||||
|
||||
@@ -212,6 +212,38 @@ def _write_service_log(service: str, msg: str, scope: Optional[str] = None, *, l
|
||||
pass
|
||||
|
||||
|
||||
def _mask_server_value(value: str, *, prefix: int = 4, suffix: int = 4) -> str:
|
||||
try:
|
||||
if not value:
|
||||
return ''
|
||||
stripped = value.strip()
|
||||
if len(stripped) <= prefix + suffix:
|
||||
return '*' * len(stripped)
|
||||
return f"{stripped[:prefix]}***{stripped[-suffix:]}"
|
||||
except Exception:
|
||||
return '***'
|
||||
|
||||
|
||||
def _summarize_socket_headers(headers) -> str:
|
||||
try:
|
||||
rendered = []
|
||||
for key, value in headers.items():
|
||||
lowered = key.lower()
|
||||
display = value
|
||||
if lowered == 'authorization':
|
||||
if isinstance(value, str) and value.lower().startswith('bearer '):
|
||||
token = value.split(' ', 1)[1]
|
||||
display = f"Bearer {_mask_server_value(token)}"
|
||||
else:
|
||||
display = _mask_server_value(str(value))
|
||||
elif lowered == 'cookie':
|
||||
display = '<redacted>'
|
||||
rendered.append(f"{key}={display}")
|
||||
return ", ".join(rendered)
|
||||
except Exception:
|
||||
return '<header-summary-unavailable>'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Section: Repository Hash Tracking
|
||||
# =============================================================================
|
||||
@@ -8015,6 +8047,55 @@ def screenshot_node_viewer(agent_id, node_id):
|
||||
# =============================================================================
|
||||
# Realtime channels for screenshots, macros, windows, and Ansible control.
|
||||
|
||||
@socketio.on('connect')
|
||||
def socket_connect():
|
||||
try:
|
||||
sid = getattr(request, 'sid', '<unknown>')
|
||||
except Exception:
|
||||
sid = '<unknown>'
|
||||
try:
|
||||
remote_addr = request.remote_addr
|
||||
except Exception:
|
||||
remote_addr = None
|
||||
try:
|
||||
scope = _canonical_server_scope(request.headers.get(_AGENT_CONTEXT_HEADER))
|
||||
except Exception:
|
||||
scope = None
|
||||
try:
|
||||
query_pairs = [f"{k}={v}" for k, v in request.args.items()] # type: ignore[attr-defined]
|
||||
query_summary = "&".join(query_pairs) if query_pairs else "<none>"
|
||||
except Exception:
|
||||
query_summary = "<unavailable>"
|
||||
header_summary = _summarize_socket_headers(getattr(request, 'headers', {}))
|
||||
transport = request.args.get('transport') if hasattr(request, 'args') else None # type: ignore[attr-defined]
|
||||
_write_service_log(
|
||||
'server',
|
||||
f"socket.io connect sid={sid} ip={remote_addr} transport={transport!r} query={query_summary} headers={header_summary}",
|
||||
scope=scope,
|
||||
)
|
||||
|
||||
|
||||
@socketio.on('disconnect')
|
||||
def socket_disconnect():
|
||||
try:
|
||||
sid = getattr(request, 'sid', '<unknown>')
|
||||
except Exception:
|
||||
sid = '<unknown>'
|
||||
try:
|
||||
remote_addr = request.remote_addr
|
||||
except Exception:
|
||||
remote_addr = None
|
||||
try:
|
||||
scope = _canonical_server_scope(request.headers.get(_AGENT_CONTEXT_HEADER))
|
||||
except Exception:
|
||||
scope = None
|
||||
_write_service_log(
|
||||
'server',
|
||||
f"socket.io disconnect sid={sid} ip={remote_addr}",
|
||||
scope=scope,
|
||||
)
|
||||
|
||||
|
||||
@socketio.on("agent_screenshot_task")
|
||||
def receive_screenshot_task(data):
|
||||
agent_id = data.get("agent_id")
|
||||
@@ -8044,6 +8125,19 @@ def connect_agent(data):
|
||||
if not agent_id:
|
||||
return
|
||||
print(f"Agent connected: {agent_id}")
|
||||
try:
|
||||
scope = _normalize_service_mode((data or {}).get("service_mode"), agent_id)
|
||||
except Exception:
|
||||
scope = None
|
||||
try:
|
||||
sid = getattr(request, 'sid', '<unknown>')
|
||||
except Exception:
|
||||
sid = '<unknown>'
|
||||
_write_service_log(
|
||||
'server',
|
||||
f"socket.io connect_agent agent_id={agent_id} sid={sid} service_mode={scope}",
|
||||
scope=scope,
|
||||
)
|
||||
|
||||
# Join per-agent room so we can address this connection specifically
|
||||
try:
|
||||
@@ -8051,7 +8145,7 @@ def connect_agent(data):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
service_mode = _normalize_service_mode((data or {}).get("service_mode"), agent_id)
|
||||
service_mode = scope if scope else _normalize_service_mode((data or {}).get("service_mode"), agent_id)
|
||||
rec = registered_agents.setdefault(agent_id, {})
|
||||
rec["agent_id"] = agent_id
|
||||
rec["hostname"] = rec.get("hostname", "unknown")
|
||||
|
||||
Reference in New Issue
Block a user