Added Agent Info Reporting and Heartbeat

This commit is contained in:
2025-08-07 23:49:57 -06:00
parent cf45f7b68f
commit 0a2e225a63
2 changed files with 151 additions and 88 deletions

View File

@@ -10,9 +10,10 @@ from functools import partial
from io import BytesIO from io import BytesIO
import base64 import base64
import traceback import traceback
import random # Macro Randomization import random # Macro Randomization
import platform # OS Detection import platform # OS Detection
import importlib.util import importlib.util
import time # Heartbeat timestamps
import socketio import socketio
from qasync import QEventLoop from qasync import QEventLoop
@@ -99,20 +100,62 @@ clear_regions_only()
# CORE SECTION: OPERATING SYSTEM DETECTION # CORE SECTION: OPERATING SYSTEM DETECTION
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
def detect_agent_os(): def detect_agent_os():
plat = platform.system().lower() """
if plat.startswith('win'): Detects the full, user-friendly operating system name and version.
return 'windows' Examples:
elif plat.startswith('linux'): - "Windows 11"
return 'linux' - "Windows 10"
elif plat.startswith('darwin'): - "Fedora Workstation 42"
return 'macos' - "Ubuntu 22.04 LTS"
else: - "macOS Sonoma"
return 'unknown' Falls back to a generic name if detection fails.
"""
try:
plat = platform.system().lower()
if plat.startswith('win'):
# On Windows, platform.release() gives major version (e.g., "10", "11")
# platform.version() can also give build info, but isn't always user-friendly
return f"Windows {platform.release()}"
elif plat.startswith('linux'):
try:
import distro # External package, better for Linux OS detection
name = distro.name(pretty=True) # e.g., "Fedora Workstation 42"
if name:
return name
else:
# Fallback if pretty name not found
return f"{platform.system()} {platform.release()}"
except ImportError:
# Fallback to basic info if distro not installed
return f"{platform.system()} {platform.release()}"
elif plat.startswith('darwin'):
# macOS — platform.mac_ver()[0] returns version number
version = platform.mac_ver()[0]
# Optional: map version numbers to marketing names
macos_names = {
"14": "Sonoma",
"13": "Ventura",
"12": "Monterey",
"11": "Big Sur",
"10.15": "Catalina"
}
pretty_name = macos_names.get(".".join(version.split(".")[:2]), "")
return f"macOS {pretty_name or version}"
else:
return f"Unknown OS ({platform.system()} {platform.release()})"
except Exception as e:
print(f"[WARN] OS detection failed: {e}")
return "Unknown"
CONFIG.data['agent_operating_system'] = detect_agent_os() CONFIG.data['agent_operating_system'] = detect_agent_os()
CONFIG._write() CONFIG._write()
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
# CORE SECTION: MACRO AUTOMATION # CORE SECTION: MACRO AUTOMATION
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
MACRO_ENGINE_PATH = os.path.join(os.path.dirname(__file__), "Python_API_Endpoints", "macro_engines.py") MACRO_ENGINE_PATH = os.path.join(os.path.dirname(__file__), "Python_API_Endpoints", "macro_engines.py")
@@ -120,7 +163,7 @@ spec = importlib.util.spec_from_file_location("macro_engines", MACRO_ENGINE_PATH
macro_engines = importlib.util.module_from_spec(spec) macro_engines = importlib.util.module_from_spec(spec)
spec.loader.exec_module(macro_engines) spec.loader.exec_module(macro_engines)
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
# CORE SECTION: ASYNC TASK / WEBSOCKET # CORE SECTION: ASYNC TASK / WEBSOCKET
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
@@ -143,10 +186,41 @@ async def stop_all_roles():
print(f"[WARN] Error closing widget: {e}") print(f"[WARN] Error closing widget: {e}")
overlay_widgets.clear() overlay_widgets.clear()
# ---------------- Heartbeat ----------------
async def send_heartbeat():
"""
Periodically send agent heartbeat to the server so the Devices page can
show hostname, OS, and last_seen.
"""
while True:
try:
payload = {
"agent_id": AGENT_ID,
"hostname": socket.gethostname(),
"agent_operating_system": CONFIG.data.get("agent_operating_system", detect_agent_os()),
"last_seen": int(time.time())
}
await sio.emit("agent_heartbeat", payload)
except Exception as e:
print(f"[WARN] heartbeat emit failed: {e}")
await asyncio.sleep(5)
@sio.event @sio.event
async def connect(): async def connect():
print(f"[WebSocket] Connected to Borealis Server with Agent ID: {AGENT_ID}") print(f"[WebSocket] Connected to Borealis Server with Agent ID: {AGENT_ID}")
await sio.emit('connect_agent', {"agent_id": AGENT_ID}) await sio.emit('connect_agent', {"agent_id": AGENT_ID})
# Send an immediate heartbeat so the UI can populate instantly.
try:
await sio.emit("agent_heartbeat", {
"agent_id": AGENT_ID,
"hostname": socket.gethostname(),
"agent_operating_system": CONFIG.data.get("agent_operating_system", detect_agent_os()),
"last_seen": int(time.time())
})
except Exception as e:
print(f"[WARN] initial heartbeat failed: {e}")
await sio.emit('request_config', {"agent_id": AGENT_ID}) await sio.emit('request_config', {"agent_id": AGENT_ID})
@sio.event @sio.event
@@ -156,7 +230,7 @@ async def disconnect():
CONFIG.data['regions'].clear() CONFIG.data['regions'].clear()
CONFIG._write() CONFIG._write()
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
# CORE SECTION: AGENT CONFIG MANAGEMENT / WINDOW MANAGEMENT # CORE SECTION: AGENT CONFIG MANAGEMENT / WINDOW MANAGEMENT
# ////////////////////////////////////////////////////////////////////////// # //////////////////////////////////////////////////////////////////////////
@sio.on('agent_config') @sio.on('agent_config')
@@ -265,7 +339,7 @@ class ScreenshotRegion(QtWidgets.QWidget):
bar_width = overlay_green_thickness * 6 bar_width = overlay_green_thickness * 6
bar_height = overlay_green_thickness bar_height = overlay_green_thickness
bar_x = (w - bar_width) // 2 bar_x = (w - bar_width) // 2
bar_y = 6 # 68 px down from top bar_y = 6 # 6-8 px down from top
p.setBrush(QtGui.QColor(0,191,255)) # Borealis Blue p.setBrush(QtGui.QColor(0,191,255)) # Borealis Blue
p.drawRect(bar_x, bar_y - bar_height - 10, bar_width, bar_height * 4) # 2px padding above green bar p.drawRect(bar_x, bar_y - bar_height - 10, bar_width, bar_height * 4) # 2px padding above green bar
@@ -380,9 +454,8 @@ async def macro_task(cfg):
while True: while True:
# Always re-fetch config (hot reload support) # Always re-fetch config (hot reload support)
# (In reality, you might want to deep-copy or re-provision on config update, but for MVP we refetch each tick)
window_handle = cfg.get('window_handle') window_handle = cfg.get('window_handle')
macro_type = cfg.get('macro_type', 'keypress') # Now matches UI config macro_type = cfg.get('macro_type', 'keypress')
operation_mode = cfg.get('operation_mode', 'Continuous') operation_mode = cfg.get('operation_mode', 'Continuous')
key = cfg.get('key') key = cfg.get('key')
text = cfg.get('text') text = cfg.get('text')
@@ -391,9 +464,8 @@ async def macro_task(cfg):
random_min = int(cfg.get('random_min', 750)) random_min = int(cfg.get('random_min', 750))
random_max = int(cfg.get('random_max', 950)) random_max = int(cfg.get('random_max', 950))
active = cfg.get('active', True) active = cfg.get('active', True)
trigger = int(cfg.get('trigger', 0)) # For trigger modes; default 0 if not set trigger = int(cfg.get('trigger', 0))
# Define helper for error reporting
async def emit_macro_status(success, message=""): async def emit_macro_status(success, message=""):
await sio.emit('macro_status', { await sio.emit('macro_status', {
"agent_id": AGENT_ID, "agent_id": AGENT_ID,
@@ -403,7 +475,6 @@ async def macro_task(cfg):
"timestamp": int(asyncio.get_event_loop().time() * 1000) "timestamp": int(asyncio.get_event_loop().time() * 1000)
}) })
# Stopped state (paused from UI)
if not (active is True or str(active).lower() == "true"): if not (active is True or str(active).lower() == "true"):
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
continue continue
@@ -411,32 +482,22 @@ async def macro_task(cfg):
try: try:
send_macro = False send_macro = False
# Operation Mode Logic
if operation_mode == "Run Once": if operation_mode == "Run Once":
if not has_run_once: if not has_run_once:
send_macro = True send_macro = True
has_run_once = True # Only run once, then stop has_run_once = True
elif operation_mode == "Continuous": elif operation_mode == "Continuous":
send_macro = True # Always run every interval send_macro = True
elif operation_mode == "Trigger-Continuous": elif operation_mode == "Trigger-Continuous":
# Only run while trigger is "1" send_macro = (trigger == 1)
if trigger == 1:
send_macro = True
else:
send_macro = False
elif operation_mode == "Trigger-Once": elif operation_mode == "Trigger-Once":
# Run only on rising edge: 0->1
if last_trigger_value == 0 and trigger == 1: if last_trigger_value == 0 and trigger == 1:
send_macro = True send_macro = True
else:
send_macro = False
last_trigger_value = trigger last_trigger_value = trigger
else: else:
# Unknown mode: default to "Continuous"
send_macro = True send_macro = True
if send_macro: if send_macro:
# Actually perform macro
if macro_type == 'keypress' and key: if macro_type == 'keypress' and key:
result = macro_engines.send_keypress_to_window(window_handle, key) result = macro_engines.send_keypress_to_window(window_handle, key)
elif macro_type == 'typed_text' and text: elif macro_type == 'typed_text' and text:
@@ -446,7 +507,6 @@ async def macro_task(cfg):
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
continue continue
# Result may be True or (False, error)
if isinstance(result, tuple): if isinstance(result, tuple):
success, err = result success, err = result
else: else:
@@ -457,10 +517,8 @@ async def macro_task(cfg):
else: else:
await emit_macro_status(False, err or "Unknown macro engine failure") await emit_macro_status(False, err or "Unknown macro engine failure")
else: else:
# No macro to send this cycle, just idle
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
# Timing: only wait if we did send macro this tick
if send_macro: if send_macro:
if randomize: if randomize:
ms = random.randint(random_min, random_max) ms = random.randint(random_min, random_max)
@@ -468,7 +526,7 @@ async def macro_task(cfg):
ms = interval_ms ms = interval_ms
await asyncio.sleep(ms / 1000.0) await asyncio.sleep(ms / 1000.0)
else: else:
await asyncio.sleep(0.1) # No macro action: check again soon await asyncio.sleep(0.1)
except asyncio.CancelledError: except asyncio.CancelledError:
print(f"[TASK] Macro role {nid} cancelled.") print(f"[TASK] Macro role {nid} cancelled.")
@@ -484,18 +542,21 @@ async def macro_task(cfg):
async def config_watcher(): async def config_watcher():
print("[DEBUG] Starting config watcher") print("[DEBUG] Starting config watcher")
while True: while True:
CONFIG.watch(); await asyncio.sleep(CONFIG.data.get('config_file_watcher_interval',2)) CONFIG.watch()
await asyncio.sleep(CONFIG.data.get('config_file_watcher_interval',2))
# ---------------- Persistent Idle Task ---------------- # ---------------- Persistent Idle Task ----------------
async def idle_task(): async def idle_task():
print("[Agent] Entering idle state. Awaiting instructions...") print("[Agent] Entering idle state. Awaiting instructions...")
try: try:
while True: while True:
await asyncio.sleep(60); print("[DEBUG] Idle task still alive.") await asyncio.sleep(60)
print("[DEBUG] Idle task still alive.")
except asyncio.CancelledError: except asyncio.CancelledError:
print("[FATAL] Idle task was cancelled!") print("[FATAL] Idle task was cancelled!")
except Exception as e: except Exception as e:
print(f"[FATAL] Idle task crashed: {e}"); traceback.print_exc() print(f"[FATAL] Idle task crashed: {e}")
traceback.print_exc()
# ---------------- Dummy Qt Widget to Prevent Exit ---------------- # ---------------- Dummy Qt Widget to Prevent Exit ----------------
class PersistentWindow(QtWidgets.QWidget): class PersistentWindow(QtWidgets.QWidget):
@@ -530,6 +591,8 @@ if __name__=='__main__':
background_tasks.append(loop.create_task(config_watcher())) background_tasks.append(loop.create_task(config_watcher()))
background_tasks.append(loop.create_task(connect_loop())) background_tasks.append(loop.create_task(connect_loop()))
background_tasks.append(loop.create_task(idle_task())) background_tasks.append(loop.create_task(idle_task()))
# Start periodic heartbeats
background_tasks.append(loop.create_task(send_heartbeat()))
loop.run_forever() loop.run_forever()
except Exception as e: except Exception as e:
print(f"[FATAL] Event loop crashed: {e}") print(f"[FATAL] Event loop crashed: {e}")

View File

@@ -115,32 +115,12 @@ def _extract_tab_name(obj: Dict) -> str:
def load_workflows(): def load_workflows():
""" """
Scan <ProjectRoot>/Workflows for *.json files and return a table-friendly list. Scan <ProjectRoot>/Workflows for *.json files and return a table-friendly list.
Response:
{
"root": "<absolute path to Workflows>",
"workflows": [
{
"name": "FolderA > Sub > File.json", # breadcrumb styled name for table display
"breadcrumb_prefix": "FolderA > Sub", # prefix only, to allow UI styling
"file_name": "File.json", # base filename
"rel_path": "FolderA/Sub/File.json", # path relative to Workflows
"tab_name": "Optional Tab Name", # best-effort read from JSON (may be "")
"description": "", # placeholder for future use
"category": "", # placeholder for future use
"last_edited": "YYYY-MM-DDTHH:MM:SS", # local time ISO-like string
"last_edited_epoch": 1712345678.123 # numeric, for client-side sorting
},
...
]
}
""" """
# Resolve <ProjectRoot>/Workflows relative to this file at <ProjectRoot>/Data/server.py # Resolve <ProjectRoot>/Workflows relative to this file at <ProjectRoot>/Data/server.py
workflows_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "Workflows")) workflows_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "Workflows"))
results: List[Dict] = [] results: List[Dict] = []
if not os.path.isdir(workflows_root): if not os.path.isdir(workflows_root):
# Directory missing is not a hard error; return empty set and the resolved path for visibility.
return jsonify({ return jsonify({
"root": workflows_root, "root": workflows_root,
"workflows": [], "workflows": [],
@@ -153,19 +133,16 @@ def load_workflows():
continue continue
full_path = os.path.join(root, fname) full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, workflows_root) # e.g. SuperStuff/Example.json rel_path = os.path.relpath(full_path, workflows_root)
# Build breadcrumb-style display name: "SuperStuff > Example.json"
parts = rel_path.split(os.sep) parts = rel_path.split(os.sep)
folder_parts = parts[:-1] folder_parts = parts[:-1]
breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else "" breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else ""
display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname
# Best-effort read of tab name (not required for now)
obj = _safe_read_json(full_path) obj = _safe_read_json(full_path)
tab_name = _extract_tab_name(obj) tab_name = _extract_tab_name(obj)
# File timestamps
try: try:
mtime = os.path.getmtime(full_path) mtime = os.path.getmtime(full_path)
except Exception: except Exception:
@@ -184,7 +161,6 @@ def load_workflows():
"last_edited_epoch": mtime "last_edited_epoch": mtime
}) })
# Sort newest-first by modification time
results.sort(key=lambda x: x.get("last_edited_epoch", 0.0), reverse=True) results.sort(key=lambda x: x.get("last_edited_epoch", 0.0), reverse=True)
return jsonify({ return jsonify({
@@ -195,13 +171,18 @@ def load_workflows():
# --------------------------------------------- # ---------------------------------------------
# Borealis Agent API Endpoints # Borealis Agent API Endpoints
# --------------------------------------------- # ---------------------------------------------
# These endpoints handle agent registration, provisioning, and image streaming. # These endpoints handle agent registration, provisioning, image streaming, and heartbeats.
registered_agents = {} # Shape expected by UI for each agent:
agent_configurations = {} # { "agent_id", "hostname", "agent_operating_system", "last_seen", "status" }
latest_images = {} registered_agents: Dict[str, Dict] = {}
agent_configurations: Dict[str, Dict] = {}
latest_images: Dict[str, Dict] = {}
@app.route("/api/agents") @app.route("/api/agents")
def get_agents(): def get_agents():
"""
Return a dict keyed by agent_id with hostname, os, last_seen, status.
"""
return jsonify(registered_agents) return jsonify(registered_agents)
@app.route("/api/agent/provision", methods=["POST"]) @app.route("/api/agent/provision", methods=["POST"])
@@ -231,7 +212,6 @@ def proxy():
if not target: if not target:
return {"error": "Missing ?url="}, 400 return {"error": "Missing ?url="}, 400
# Forward method, headers, body
upstream = requests.request( upstream = requests.request(
method = request.method, method = request.method,
url = target, url = target,
@@ -251,7 +231,6 @@ def proxy():
# --------------------------------------------- # ---------------------------------------------
# Live Screenshot Viewer for Debugging # Live Screenshot Viewer for Debugging
# --------------------------------------------- # ---------------------------------------------
# Serves an HTML canvas that shows real-time screenshots from a given agent+node.
@app.route("/api/agent/<agent_id>/node/<node_id>/screenshot/live") @app.route("/api/agent/<agent_id>/node/<node_id>/screenshot/live")
def screenshot_node_viewer(agent_id, node_id): def screenshot_node_viewer(agent_id, node_id):
return f""" return f"""
@@ -326,25 +305,51 @@ def receive_screenshot_task(data):
"timestamp": time.time() "timestamp": time.time()
} }
# Emit the full payload, including geometry (even if image is empty)
emit("agent_screenshot_task", data, broadcast=True) emit("agent_screenshot_task", data, broadcast=True)
@socketio.on("connect_agent") @socketio.on("connect_agent")
def connect_agent(data): def connect_agent(data):
agent_id = data.get("agent_id") """
hostname = data.get("hostname", "unknown") Initial agent connect. Agent may only send agent_id here.
Hostname/OS are filled in by subsequent heartbeats.
"""
agent_id = (data or {}).get("agent_id")
if not agent_id:
return
print(f"Agent connected: {agent_id}") print(f"Agent connected: {agent_id}")
registered_agents[agent_id] = { rec = registered_agents.setdefault(agent_id, {})
"agent_id": agent_id, rec["agent_id"] = agent_id
"hostname": hostname, rec["hostname"] = rec.get("hostname", "unknown")
"last_seen": time.time(), rec["agent_operating_system"] = rec.get("agent_operating_system", "-")
"status": "orphaned" if agent_id not in agent_configurations else "provisioned" rec["last_seen"] = int(time.time())
} rec["status"] = "provisioned" if agent_id in agent_configurations else "orphaned"
@socketio.on("agent_heartbeat")
def on_agent_heartbeat(data):
"""
Heartbeat payload from agent:
{ agent_id, hostname, agent_operating_system, last_seen }
Updates registry so Devices view can display OS/hostname and recency.
"""
if not data:
return
agent_id = data.get("agent_id")
if not agent_id:
return
rec = registered_agents.setdefault(agent_id, {})
rec["agent_id"] = agent_id
if data.get("hostname"):
rec["hostname"] = data.get("hostname")
if data.get("agent_operating_system"):
rec["agent_operating_system"] = data.get("agent_operating_system")
rec["last_seen"] = int(data.get("last_seen") or time.time())
rec["status"] = "provisioned" if agent_id in agent_configurations else rec.get("status", "orphaned")
@socketio.on("request_config") @socketio.on("request_config")
def send_agent_config(data): def send_agent_config(data):
agent_id = data.get("agent_id") agent_id = (data or {}).get("agent_id")
config = agent_configurations.get(agent_id) config = agent_configurations.get(agent_id)
if config: if config:
emit("agent_config", config) emit("agent_config", config)
@@ -386,9 +391,6 @@ def handle_list_agent_windows(data):
""" """
Forwards list_agent_windows event to all agents (or filter for a specific agent_id). Forwards list_agent_windows event to all agents (or filter for a specific agent_id).
""" """
agent_id = data.get("agent_id")
# You can target a specific agent if you track rooms/sessions.
# For now, broadcast to all agents so the correct one can reply.
emit("list_agent_windows", data, broadcast=True) emit("list_agent_windows", data, broadcast=True)
@socketio.on("agent_window_list") @socketio.on("agent_window_list")
@@ -398,11 +400,9 @@ def handle_agent_window_list(data):
""" """
emit("agent_window_list", data, broadcast=True) emit("agent_window_list", data, broadcast=True)
# --------------------------------------------- # ---------------------------------------------
# Server Launch # Server Launch
# --------------------------------------------- # ---------------------------------------------
if __name__ == "__main__": if __name__ == "__main__":
import eventlet.wsgi # Use SocketIO runner so WebSocket transport works with eventlet.
listener = eventlet.listen(('0.0.0.0', 5000)) socketio.run(app, host="0.0.0.0", port=5000)
eventlet.wsgi.server(listener, app)