diff --git a/Data/Agent/borealis-agent.py b/Data/Agent/borealis-agent.py index c88a5f3..7c10a6c 100644 --- a/Data/Agent/borealis-agent.py +++ b/Data/Agent/borealis-agent.py @@ -1,10 +1,9 @@ import sys import uuid import time -import json import base64 import threading -import requests +import socketio from io import BytesIO import socket @@ -12,131 +11,56 @@ from PyQt5 import QtCore, QtGui, QtWidgets from PIL import ImageGrab # ---------------- Configuration ---------------- -SERVER_URL = "https://borealis.bunny-lab.io" # Production URL Example -#SERVER_URL = "http://localhost:5000" # Development URL Example -CHECKIN_ENDPOINT = f"{SERVER_URL}/api/agent/checkin" -CONFIG_ENDPOINT = f"{SERVER_URL}/api/agent/config" -DATA_POST_ENDPOINT = f"{SERVER_URL}/api/agent/data" -HEARTBEAT_ENDPOINT = f"{SERVER_URL}/api/agent/heartbeat" +SERVER_URL = "http://localhost:5000" # WebSocket-enabled server URL HOSTNAME = socket.gethostname().lower() RANDOM_SUFFIX = uuid.uuid4().hex[:8] AGENT_ID = f"{HOSTNAME}-agent-{RANDOM_SUFFIX}" -# Default poll interval for config. Adjust as needed. -CONFIG_POLL_INTERVAL = 5 - # ---------------- State ---------------- app_instance = None region_widget = None -capture_thread_started = False current_interval = 1000 config_ready = threading.Event() overlay_visible = True -heartbeat_thread_started = False -# Track if we have a valid connection to Borealis -IS_CONNECTED = False -CONNECTION_LOST_REPORTED = False - -# Keep a copy of the last config to avoid repeated provisioning LAST_CONFIG = {} -# ---------------- Signal Bridge ---------------- -class RegionLauncher(QtCore.QObject): - trigger = QtCore.pyqtSignal(int, int, int, int) +# WebSocket client setup +sio = socketio.Client() - def __init__(self): - super().__init__() - self.trigger.connect(self.handle) +# ---------------- WebSocket Handlers ---------------- +@sio.event +def connect(): + print(f"[WS CONNECTED] Agent ID: {AGENT_ID} connected to Borealis.") + sio.emit('connect_agent', {"agent_id": AGENT_ID, "hostname": HOSTNAME}) + sio.emit('request_config', {"agent_id": AGENT_ID}) - def handle(self, x, y, w, h): - launch_region(x, y, w, h) +@sio.event +def disconnect(): + print("[WS DISCONNECTED] Lost connection to Borealis server.") -region_launcher = None +@sio.on('agent_config') +def on_agent_config(config): + global current_interval, overlay_visible, LAST_CONFIG -# ---------------- Helper: Reconnect ---------------- -def reconnect(): - """ - Attempt to connect to Borealis until successful. - Sets IS_CONNECTED = True upon success. - """ - global IS_CONNECTED, CONNECTION_LOST_REPORTED - while not IS_CONNECTED: - try: - requests.post(CHECKIN_ENDPOINT, json={"agent_id": AGENT_ID, "hostname": HOSTNAME}, timeout=5) - IS_CONNECTED = True - CONNECTION_LOST_REPORTED = False - print(f"[INFO] Agent ID: {AGENT_ID} connected to Borealis.") - except Exception: - if not CONNECTION_LOST_REPORTED: - print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}") - CONNECTION_LOST_REPORTED = True - time.sleep(10) + if config != LAST_CONFIG: + print("[PROVISIONING] Received new configuration from Borealis.") + x = config.get("x", 100) + y = config.get("y", 100) + w = config.get("w", 300) + h = config.get("h", 200) + current_interval = config.get("interval", 1000) + overlay_visible = config.get("visible", True) -# ---------------- Networking ---------------- -def poll_for_config(): - """ - Polls for agent configuration from Borealis. - Returns a config dict or None on failure. - """ - try: - res = requests.get(CONFIG_ENDPOINT, params={"agent_id": AGENT_ID}, timeout=5) - if res.status_code == 200: - return res.json() + if not region_widget: + region_launcher.trigger.emit(x, y, w, h) else: - print(f"[ERROR] Config polling returned status: {res.status_code}") - except Exception: - # We'll let the config_loop handle setting IS_CONNECTED = False - pass - return None + region_widget.setGeometry(x, y, w, h) + region_widget.setVisible(overlay_visible) -def send_image_data(image): - """ - Attempts to POST screenshot data to Borealis if IS_CONNECTED is True. - """ - global IS_CONNECTED, CONNECTION_LOST_REPORTED - if not IS_CONNECTED: - return # Skip sending if not connected - - try: - buffer = BytesIO() - image.save(buffer, format="PNG") - encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") - - response = requests.post(DATA_POST_ENDPOINT, json={ - "agent_id": AGENT_ID, - "type": "screenshot", - "image_base64": encoded - }, timeout=5) - - if response.status_code != 200: - print(f"[ERROR] Screenshot POST failed: {response.status_code} - {response.text}") - except Exception as e: - if IS_CONNECTED and not CONNECTION_LOST_REPORTED: - # Only report once - print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}") - CONNECTION_LOST_REPORTED = True - IS_CONNECTED = False - -def send_heartbeat(): - """ - Attempts to send heartbeat if IS_CONNECTED is True. - """ - global IS_CONNECTED, CONNECTION_LOST_REPORTED - if not IS_CONNECTED: - return - - try: - response = requests.get(HEARTBEAT_ENDPOINT, params={"agent_id": AGENT_ID}, timeout=5) - if response.status_code != 200: - print(f"[ERROR] Heartbeat returned status: {response.status_code}") - raise ValueError("Heartbeat not 200") - except Exception: - if IS_CONNECTED and not CONNECTION_LOST_REPORTED: - print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}") - CONNECTION_LOST_REPORTED = True - IS_CONNECTED = False + LAST_CONFIG = config + config_ready.set() # ---------------- Region Overlay ---------------- class ScreenshotRegion(QtWidgets.QWidget): @@ -161,12 +85,10 @@ class ScreenshotRegion(QtWidgets.QWidget): painter = QtGui.QPainter(self) painter.setRenderHint(QtGui.QPainter.Antialiasing) - # Transparent fill painter.setBrush(QtCore.Qt.transparent) painter.setPen(QtGui.QPen(QtGui.QColor(0, 255, 0), 2)) painter.drawRect(self.rect()) - # Resize Handle Visual (Bottom-Right) handle_rect = QtCore.QRect( self.width() - self.resize_handle_size, self.height() - self.resize_handle_size, @@ -199,112 +121,49 @@ class ScreenshotRegion(QtWidgets.QWidget): geo = self.geometry() return geo.x(), geo.y(), geo.width(), geo.height() -# ---------------- Threads ---------------- +# ---------------- Screenshot Capture ---------------- def capture_loop(): - """ - Continuously captures the user-defined region every current_interval ms if connected. - """ - global current_interval print("[INFO] Screenshot capture loop started") config_ready.wait() while region_widget is None: - print("[WAIT] Waiting for region widget to initialize...") time.sleep(0.2) - print(f"[INFO] Agent Capturing Region: x:{region_widget.x()} y:{region_widget.y()} w:{region_widget.width()} h:{region_widget.height()}") - while True: - if overlay_visible and IS_CONNECTED: + if overlay_visible: x, y, w, h = region_widget.get_geometry() try: img = ImageGrab.grab(bbox=(x, y, x + w, y + h)) - send_image_data(img) + buffer = BytesIO() + img.save(buffer, format="PNG") + encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8") + + sio.emit('screenshot', { + 'agent_id': AGENT_ID, + 'image_base64': encoded_image + }) except Exception as e: print(f"[ERROR] Screenshot error: {e}") + time.sleep(current_interval / 1000) -def heartbeat_loop(): - """ - Heartbeat every 10 seconds if connected. - If it fails, we set IS_CONNECTED=False, and rely on config_loop to reconnect. - """ - while True: - send_heartbeat() - time.sleep(10) +# ---------------- UI Launcher ---------------- +class RegionLauncher(QtCore.QObject): + trigger = QtCore.pyqtSignal(int, int, int, int) -def config_loop(): - """ - 1) Reconnect (if needed) until the agent can contact Borealis - 2) Poll for config. If new config is different from LAST_CONFIG, re-provision - 3) If poll_for_config fails or we see connection issues, set IS_CONNECTED=False - and loop back to reconnect() on next iteration - """ - global capture_thread_started, heartbeat_thread_started - global current_interval, overlay_visible, LAST_CONFIG, IS_CONNECTED + def __init__(self): + super().__init__() + self.trigger.connect(self.handle) - while True: - # If we aren't connected, reconnect - if not IS_CONNECTED: - reconnect() + def handle(self, x, y, w, h): + launch_region(x, y, w, h) - # Attempt to get config - config = poll_for_config() - if config is None: - # This means we had a poll failure, so mark disconnected and retry. - IS_CONNECTED = False - continue - - # If it has a "task" : "screenshot" - if config.get("task") == "screenshot": - # Compare to last known config - if config != LAST_CONFIG: - # Something changed, so provision - print("[PROVISIONING] Agent Provisioning Command Issued by Borealis") - - x = config.get("x", 100) - y = config.get("y", 100) - w = config.get("w", 300) - h = config.get("h", 200) - current_interval = config.get("interval", 1000) - overlay_visible = config.get("visible", True) - - print('[PROVISIONING] Agent Configured as "Screenshot" Collector') - print(f'[PROVISIONING] Polling Rate: {current_interval / 1000:.1f}s') - - # Show or move region widget - if not region_widget: - region_launcher.trigger.emit(x, y, w, h) - else: - region_widget.setGeometry(x, y, w, h) - region_widget.setVisible(overlay_visible) - - LAST_CONFIG = config - - # Make sure capture thread is started - if not capture_thread_started: - threading.Thread(target=capture_loop, daemon=True).start() - capture_thread_started = True - - # Make sure heartbeat thread is started - if not heartbeat_thread_started: - threading.Thread(target=heartbeat_loop, daemon=True).start() - heartbeat_thread_started = True - - # Signal that provisioning is done so capture thread can run - config_ready.set() - - # Sleep before next poll - time.sleep(CONFIG_POLL_INTERVAL) +region_launcher = None def launch_region(x, y, w, h): - """ - Initializes the screenshot region overlay widget exactly once. - """ global region_widget if region_widget: return - print("[INFO] Agent Starting...") region_widget = ScreenshotRegion(x, y, w, h) region_widget.show() @@ -313,8 +172,8 @@ if __name__ == "__main__": app_instance = QtWidgets.QApplication(sys.argv) region_launcher = RegionLauncher() - # Start the config loop in a background thread - threading.Thread(target=config_loop, daemon=True).start() + sio.connect(SERVER_URL) + + threading.Thread(target=capture_loop, daemon=True).start() - # Enter Qt main event loop sys.exit(app_instance.exec_()) diff --git a/Data/WebUI/src/nodes/Agent/Node_Borealis_Agent.jsx b/Data/WebUI/src/nodes/Agent/Node_Borealis_Agent.jsx index 227e22b..08b67a5 100644 --- a/Data/WebUI/src/nodes/Agent/Node_Borealis_Agent.jsx +++ b/Data/WebUI/src/nodes/Agent/Node_Borealis_Agent.jsx @@ -1,41 +1,38 @@ -import React, { useEffect, useState } from "react"; +import React, { useEffect, useState, useRef } from "react"; import { Handle, Position, useReactFlow } from "reactflow"; +import { io } from "socket.io-client"; + +const socket = io(); const APINode = ({ id, data }) => { const { setNodes } = useReactFlow(); const [agents, setAgents] = useState([]); const [selectedAgent, setSelectedAgent] = useState(data.agent_id || ""); const [selectedType, setSelectedType] = useState(data.data_type || "screenshot"); - const [imageData, setImageData] = useState(""); const [intervalMs, setIntervalMs] = useState(data.interval || 1000); const [paused, setPaused] = useState(false); const [overlayVisible, setOverlayVisible] = useState(true); + const [imageData, setImageData] = useState(""); - // Refresh agents every 5s useEffect(() => { - const fetchAgents = () => fetch("/api/agents").then(res => res.json()).then(setAgents); - fetchAgents(); - const interval = setInterval(fetchAgents, 5000); + fetch("/api/agents").then(res => res.json()).then(setAgents); + const interval = setInterval(() => { + fetch("/api/agents").then(res => res.json()).then(setAgents); + }, 5000); return () => clearInterval(interval); }, []); - // Pull image if agent provisioned useEffect(() => { - if (!selectedAgent || paused) return; - const interval = setInterval(() => { - fetch(`/api/agent/image?agent_id=${selectedAgent}`) - .then(res => res.json()) - .then(json => { - if (json.image_base64) { - setImageData(json.image_base64); - window.BorealisValueBus = window.BorealisValueBus || {}; - window.BorealisValueBus[id] = json.image_base64; - } - }) - .catch(() => { }); - }, intervalMs); - return () => clearInterval(interval); - }, [selectedAgent, id, paused, intervalMs]); + socket.on('new_screenshot', (data) => { + if (data.agent_id === selectedAgent) { + setImageData(data.image_base64); + window.BorealisValueBus = window.BorealisValueBus || {}; + window.BorealisValueBus[id] = data.image_base64; + } + }); + + return () => socket.off('new_screenshot'); + }, [selectedAgent, id]); const provisionAgent = () => { if (!selectedAgent) return; @@ -53,16 +50,12 @@ const APINode = ({ id, data }) => { task: selectedType }) }).then(() => { + socket.emit('request_config', { agent_id: selectedAgent }); setNodes(nds => nds.map(n => n.id === id ? { ...n, - data: { - ...n.data, - agent_id: selectedAgent, - data_type: selectedType, - interval: intervalMs - } + data: { agent_id: selectedAgent, data_type: selectedType, interval: intervalMs } } : n ) @@ -73,13 +66,7 @@ const APINode = ({ id, data }) => { const toggleOverlay = () => { const newVisibility = !overlayVisible; setOverlayVisible(newVisibility); - if (selectedAgent) { - fetch("/api/agent/overlay_visibility", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ agent_id: selectedAgent, visible: newVisibility }) - }); - } + provisionAgent(); }; return ( @@ -154,7 +141,7 @@ const APINode = ({ id, data }) => { export default { type: "API_Data_Collector", label: "API Data Collector", - description: "Connects to a remote agent via API and collects data such as screenshots, OCR results, and more.", - content: "Publishes agent-collected data into the workflow ValueBus.", + description: "Real-time provisioning and image collection via WebSocket.", + content: "Publishes real-time agent-collected data into the workflow.", component: APINode -}; \ No newline at end of file +}; diff --git a/Data/server.py b/Data/server.py index cfc79b8..bdfe17b 100644 --- a/Data/server.py +++ b/Data/server.py @@ -1,4 +1,5 @@ from flask import Flask, request, jsonify, send_from_directory +from flask_socketio import SocketIO, emit import time import os @@ -10,6 +11,7 @@ if not os.path.exists(build_folder): print("WARNING: web-interface build folder not found. Please build your React app.") app = Flask(__name__, static_folder=build_folder, static_url_path="/") +socketio = SocketIO(app, cors_allowed_origins="*") # Allow cross-origin WebSocket connections @app.route("/") def serve_index(): @@ -18,7 +20,6 @@ def serve_index(): return send_from_directory(build_folder, "index.html") return "

Borealis React App Code Not Found

Please re-deploy Borealis Workflow Automation Tool

", 404 -# Wildcard route to serve React for sub-routes (e.g., /workflow) @app.route("/") def serve_react_app(path): full_path = os.path.join(build_folder, path) @@ -27,12 +28,13 @@ def serve_react_app(path): return send_from_directory(build_folder, "index.html") # --------------------------------------------- -# Borealis Agent API Endpoints +# Borealis Agent Management (Hybrid: API + WebSockets) # --------------------------------------------- registered_agents = {} agent_configurations = {} latest_images = {} +# API Endpoints (kept for provisioning and status) @app.route("/api/agent/checkin", methods=["POST"]) def agent_checkin(): data = request.json @@ -47,68 +49,75 @@ def agent_checkin(): } return jsonify({"status": "ok"}) -@app.route("/api/agent/reset", methods=["POST"]) -def reset_agent(): - agent_id = request.json.get("agent_id") - if agent_id in agents: - agents[agent_id]["status"] = "orphaned" - agents[agent_id]["config"] = None - latest_images.pop(agent_id, None) - return jsonify({"status": "reset"}), 200 - return jsonify({"error": "Agent not found"}), 404 - @app.route("/api/agent/provision", methods=["POST"]) def provision_agent(): data = request.json agent_id = data.get("agent_id") config = { - "task": "screenshot", + "task": data.get("task", "screenshot"), "x": data.get("x", 100), "y": data.get("y", 100), "w": data.get("w", 300), "h": data.get("h", 200), - "interval": data.get("interval", 1000) + "interval": data.get("interval", 1000), + "visible": data.get("visible", True) } agent_configurations[agent_id] = config if agent_id in registered_agents: registered_agents[agent_id]["status"] = "provisioned" return jsonify({"status": "provisioned"}) -@app.route("/api/agent/config") -def get_agent_config(): - agent_id = request.args.get("agent_id") - config = agent_configurations.get(agent_id) - return jsonify(config or {}) - -@app.route("/api/agent/data", methods=["POST"]) -def agent_data(): - data = request.json - agent_id = data.get("agent_id") - image = data.get("image_base64") - - if not agent_id or not image: - return jsonify({"error": "Missing data"}), 400 - - latest_images[agent_id] = { - "image_base64": image, - "timestamp": time.time() - } - return jsonify({"status": "received"}) - -@app.route("/api/agent/image") -def get_latest_image(): - agent_id = request.args.get("agent_id") - entry = latest_images.get(agent_id) - if entry: - return jsonify(entry) - return jsonify({"error": "No image"}), 404 - @app.route("/api/agents") def get_agents(): return jsonify(registered_agents) +# WebSocket Handlers +@socketio.on('connect_agent') +def handle_agent_connect(data): + agent_id = data.get('agent_id') + hostname = data.get('hostname', 'unknown') + + registered_agents[agent_id] = { + "agent_id": agent_id, + "hostname": hostname, + "last_seen": time.time(), + "status": "connected" + } + + print(f"Agent connected: {agent_id}") + emit('agent_connected', {'status': 'connected'}) + +@socketio.on('screenshot') +def handle_screenshot(data): + agent_id = data.get('agent_id') + image_base64 = data.get('image_base64') + + if agent_id and image_base64: + latest_images[agent_id] = { + 'image_base64': image_base64, + 'timestamp': time.time() + } + + # Real-time broadcast to connected dashboards + emit('new_screenshot', { + 'agent_id': agent_id, + 'image_base64': image_base64 + }, broadcast=True) + + emit('screenshot_received', {'status': 'ok'}) + print(f"Screenshot received from agent: {agent_id}") + else: + emit('error', {'message': 'Invalid screenshot data'}) + +@socketio.on('request_config') +def handle_request_config(data): + agent_id = data.get('agent_id') + config = agent_configurations.get(agent_id, {}) + + emit('agent_config', config) + # --------------------------------------------- # Server Start # --------------------------------------------- if __name__ == "__main__": - app.run(host="0.0.0.0", port=5000, debug=False) + socketio.run(app, host="0.0.0.0", port=5000, debug=False) \ No newline at end of file diff --git a/readme.md b/readme.md index bdf86a6..114f862 100644 --- a/readme.md +++ b/readme.md @@ -52,4 +52,43 @@ The launch script will: ## 🧠 How It Works -Borealis workflows run on **live data propagation**. Each node checks for incoming values (via edges) and processes them on a recurring timer (default: 200ms). This allows for highly reactive, composable logic graphs. \ No newline at end of file +Borealis workflows run on **live data propagation**. Each node checks for incoming values (via edges) and processes them on a recurring timer (default: 200ms). This allows for highly reactive, composable logic graphs. + +## ⚔Reverse Proxy Configuration +If you want to run Borealis behind a reverse proxy (e.g., Traefik), you can set up the following dynamic configuration: +```yml +http: + routers: + borealis: + entryPoints: + - websecure + tls: + certResolver: letsencrypt + service: borealis + rule: "Host(`borealis.bunny-lab.io`) && PathPrefix(`/`)" + middlewares: + - cors-headers + + middlewares: + cors-headers: + headers: + accessControlAllowOriginList: + - "*" + accessControlAllowMethods: + - GET + - POST + - OPTIONS + accessControlAllowHeaders: + - Content-Type + - Upgrade + - Connection + accessControlMaxAge: 100 + addVaryHeader: true + + services: + borealis: + loadBalancer: + servers: + - url: "http://192.168.3.254:5000" + passHostHeader: true +``` \ No newline at end of file