Added Reverse Proxy Example to Readme

This commit is contained in:
2025-04-07 21:03:39 -06:00
parent bcf380e1fa
commit 31e899c842
4 changed files with 171 additions and 277 deletions

View File

@ -1,10 +1,9 @@
import sys import sys
import uuid import uuid
import time import time
import json
import base64 import base64
import threading import threading
import requests import socketio
from io import BytesIO from io import BytesIO
import socket import socket
@ -12,131 +11,56 @@ from PyQt5 import QtCore, QtGui, QtWidgets
from PIL import ImageGrab from PIL import ImageGrab
# ---------------- Configuration ---------------- # ---------------- Configuration ----------------
SERVER_URL = "https://borealis.bunny-lab.io" # Production URL Example SERVER_URL = "http://localhost:5000" # WebSocket-enabled server URL
#SERVER_URL = "http://localhost:5000" # Development URL Example
CHECKIN_ENDPOINT = f"{SERVER_URL}/api/agent/checkin"
CONFIG_ENDPOINT = f"{SERVER_URL}/api/agent/config"
DATA_POST_ENDPOINT = f"{SERVER_URL}/api/agent/data"
HEARTBEAT_ENDPOINT = f"{SERVER_URL}/api/agent/heartbeat"
HOSTNAME = socket.gethostname().lower() HOSTNAME = socket.gethostname().lower()
RANDOM_SUFFIX = uuid.uuid4().hex[:8] RANDOM_SUFFIX = uuid.uuid4().hex[:8]
AGENT_ID = f"{HOSTNAME}-agent-{RANDOM_SUFFIX}" AGENT_ID = f"{HOSTNAME}-agent-{RANDOM_SUFFIX}"
# Default poll interval for config. Adjust as needed.
CONFIG_POLL_INTERVAL = 5
# ---------------- State ---------------- # ---------------- State ----------------
app_instance = None app_instance = None
region_widget = None region_widget = None
capture_thread_started = False
current_interval = 1000 current_interval = 1000
config_ready = threading.Event() config_ready = threading.Event()
overlay_visible = True overlay_visible = True
heartbeat_thread_started = False
# Track if we have a valid connection to Borealis
IS_CONNECTED = False
CONNECTION_LOST_REPORTED = False
# Keep a copy of the last config to avoid repeated provisioning
LAST_CONFIG = {} LAST_CONFIG = {}
# ---------------- Signal Bridge ---------------- # WebSocket client setup
class RegionLauncher(QtCore.QObject): sio = socketio.Client()
trigger = QtCore.pyqtSignal(int, int, int, int)
def __init__(self): # ---------------- WebSocket Handlers ----------------
super().__init__() @sio.event
self.trigger.connect(self.handle) def connect():
print(f"[WS CONNECTED] Agent ID: {AGENT_ID} connected to Borealis.")
sio.emit('connect_agent', {"agent_id": AGENT_ID, "hostname": HOSTNAME})
sio.emit('request_config', {"agent_id": AGENT_ID})
def handle(self, x, y, w, h): @sio.event
launch_region(x, y, w, h) def disconnect():
print("[WS DISCONNECTED] Lost connection to Borealis server.")
region_launcher = None @sio.on('agent_config')
def on_agent_config(config):
global current_interval, overlay_visible, LAST_CONFIG
# ---------------- Helper: Reconnect ---------------- if config != LAST_CONFIG:
def reconnect(): print("[PROVISIONING] Received new configuration from Borealis.")
""" x = config.get("x", 100)
Attempt to connect to Borealis until successful. y = config.get("y", 100)
Sets IS_CONNECTED = True upon success. w = config.get("w", 300)
""" h = config.get("h", 200)
global IS_CONNECTED, CONNECTION_LOST_REPORTED current_interval = config.get("interval", 1000)
while not IS_CONNECTED: overlay_visible = config.get("visible", True)
try:
requests.post(CHECKIN_ENDPOINT, json={"agent_id": AGENT_ID, "hostname": HOSTNAME}, timeout=5)
IS_CONNECTED = True
CONNECTION_LOST_REPORTED = False
print(f"[INFO] Agent ID: {AGENT_ID} connected to Borealis.")
except Exception:
if not CONNECTION_LOST_REPORTED:
print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}")
CONNECTION_LOST_REPORTED = True
time.sleep(10)
# ---------------- Networking ---------------- if not region_widget:
def poll_for_config(): region_launcher.trigger.emit(x, y, w, h)
"""
Polls for agent configuration from Borealis.
Returns a config dict or None on failure.
"""
try:
res = requests.get(CONFIG_ENDPOINT, params={"agent_id": AGENT_ID}, timeout=5)
if res.status_code == 200:
return res.json()
else: else:
print(f"[ERROR] Config polling returned status: {res.status_code}") region_widget.setGeometry(x, y, w, h)
except Exception: region_widget.setVisible(overlay_visible)
# We'll let the config_loop handle setting IS_CONNECTED = False
pass
return None
def send_image_data(image): LAST_CONFIG = config
""" config_ready.set()
Attempts to POST screenshot data to Borealis if IS_CONNECTED is True.
"""
global IS_CONNECTED, CONNECTION_LOST_REPORTED
if not IS_CONNECTED:
return # Skip sending if not connected
try:
buffer = BytesIO()
image.save(buffer, format="PNG")
encoded = base64.b64encode(buffer.getvalue()).decode("utf-8")
response = requests.post(DATA_POST_ENDPOINT, json={
"agent_id": AGENT_ID,
"type": "screenshot",
"image_base64": encoded
}, timeout=5)
if response.status_code != 200:
print(f"[ERROR] Screenshot POST failed: {response.status_code} - {response.text}")
except Exception as e:
if IS_CONNECTED and not CONNECTION_LOST_REPORTED:
# Only report once
print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}")
CONNECTION_LOST_REPORTED = True
IS_CONNECTED = False
def send_heartbeat():
"""
Attempts to send heartbeat if IS_CONNECTED is True.
"""
global IS_CONNECTED, CONNECTION_LOST_REPORTED
if not IS_CONNECTED:
return
try:
response = requests.get(HEARTBEAT_ENDPOINT, params={"agent_id": AGENT_ID}, timeout=5)
if response.status_code != 200:
print(f"[ERROR] Heartbeat returned status: {response.status_code}")
raise ValueError("Heartbeat not 200")
except Exception:
if IS_CONNECTED and not CONNECTION_LOST_REPORTED:
print(f"[CONNECTION LOST] Attempting to Reconnect to Borealis Server at {SERVER_URL}")
CONNECTION_LOST_REPORTED = True
IS_CONNECTED = False
# ---------------- Region Overlay ---------------- # ---------------- Region Overlay ----------------
class ScreenshotRegion(QtWidgets.QWidget): class ScreenshotRegion(QtWidgets.QWidget):
@ -161,12 +85,10 @@ class ScreenshotRegion(QtWidgets.QWidget):
painter = QtGui.QPainter(self) painter = QtGui.QPainter(self)
painter.setRenderHint(QtGui.QPainter.Antialiasing) painter.setRenderHint(QtGui.QPainter.Antialiasing)
# Transparent fill
painter.setBrush(QtCore.Qt.transparent) painter.setBrush(QtCore.Qt.transparent)
painter.setPen(QtGui.QPen(QtGui.QColor(0, 255, 0), 2)) painter.setPen(QtGui.QPen(QtGui.QColor(0, 255, 0), 2))
painter.drawRect(self.rect()) painter.drawRect(self.rect())
# Resize Handle Visual (Bottom-Right)
handle_rect = QtCore.QRect( handle_rect = QtCore.QRect(
self.width() - self.resize_handle_size, self.width() - self.resize_handle_size,
self.height() - self.resize_handle_size, self.height() - self.resize_handle_size,
@ -199,112 +121,49 @@ class ScreenshotRegion(QtWidgets.QWidget):
geo = self.geometry() geo = self.geometry()
return geo.x(), geo.y(), geo.width(), geo.height() return geo.x(), geo.y(), geo.width(), geo.height()
# ---------------- Threads ---------------- # ---------------- Screenshot Capture ----------------
def capture_loop(): def capture_loop():
"""
Continuously captures the user-defined region every current_interval ms if connected.
"""
global current_interval
print("[INFO] Screenshot capture loop started") print("[INFO] Screenshot capture loop started")
config_ready.wait() config_ready.wait()
while region_widget is None: while region_widget is None:
print("[WAIT] Waiting for region widget to initialize...")
time.sleep(0.2) time.sleep(0.2)
print(f"[INFO] Agent Capturing Region: x:{region_widget.x()} y:{region_widget.y()} w:{region_widget.width()} h:{region_widget.height()}")
while True: while True:
if overlay_visible and IS_CONNECTED: if overlay_visible:
x, y, w, h = region_widget.get_geometry() x, y, w, h = region_widget.get_geometry()
try: try:
img = ImageGrab.grab(bbox=(x, y, x + w, y + h)) img = ImageGrab.grab(bbox=(x, y, x + w, y + h))
send_image_data(img) buffer = BytesIO()
img.save(buffer, format="PNG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
sio.emit('screenshot', {
'agent_id': AGENT_ID,
'image_base64': encoded_image
})
except Exception as e: except Exception as e:
print(f"[ERROR] Screenshot error: {e}") print(f"[ERROR] Screenshot error: {e}")
time.sleep(current_interval / 1000) time.sleep(current_interval / 1000)
def heartbeat_loop(): # ---------------- UI Launcher ----------------
""" class RegionLauncher(QtCore.QObject):
Heartbeat every 10 seconds if connected. trigger = QtCore.pyqtSignal(int, int, int, int)
If it fails, we set IS_CONNECTED=False, and rely on config_loop to reconnect.
"""
while True:
send_heartbeat()
time.sleep(10)
def config_loop(): def __init__(self):
""" super().__init__()
1) Reconnect (if needed) until the agent can contact Borealis self.trigger.connect(self.handle)
2) Poll for config. If new config is different from LAST_CONFIG, re-provision
3) If poll_for_config fails or we see connection issues, set IS_CONNECTED=False
and loop back to reconnect() on next iteration
"""
global capture_thread_started, heartbeat_thread_started
global current_interval, overlay_visible, LAST_CONFIG, IS_CONNECTED
while True: def handle(self, x, y, w, h):
# If we aren't connected, reconnect launch_region(x, y, w, h)
if not IS_CONNECTED:
reconnect()
# Attempt to get config region_launcher = None
config = poll_for_config()
if config is None:
# This means we had a poll failure, so mark disconnected and retry.
IS_CONNECTED = False
continue
# If it has a "task" : "screenshot"
if config.get("task") == "screenshot":
# Compare to last known config
if config != LAST_CONFIG:
# Something changed, so provision
print("[PROVISIONING] Agent Provisioning Command Issued by Borealis")
x = config.get("x", 100)
y = config.get("y", 100)
w = config.get("w", 300)
h = config.get("h", 200)
current_interval = config.get("interval", 1000)
overlay_visible = config.get("visible", True)
print('[PROVISIONING] Agent Configured as "Screenshot" Collector')
print(f'[PROVISIONING] Polling Rate: {current_interval / 1000:.1f}s')
# Show or move region widget
if not region_widget:
region_launcher.trigger.emit(x, y, w, h)
else:
region_widget.setGeometry(x, y, w, h)
region_widget.setVisible(overlay_visible)
LAST_CONFIG = config
# Make sure capture thread is started
if not capture_thread_started:
threading.Thread(target=capture_loop, daemon=True).start()
capture_thread_started = True
# Make sure heartbeat thread is started
if not heartbeat_thread_started:
threading.Thread(target=heartbeat_loop, daemon=True).start()
heartbeat_thread_started = True
# Signal that provisioning is done so capture thread can run
config_ready.set()
# Sleep before next poll
time.sleep(CONFIG_POLL_INTERVAL)
def launch_region(x, y, w, h): def launch_region(x, y, w, h):
"""
Initializes the screenshot region overlay widget exactly once.
"""
global region_widget global region_widget
if region_widget: if region_widget:
return return
print("[INFO] Agent Starting...")
region_widget = ScreenshotRegion(x, y, w, h) region_widget = ScreenshotRegion(x, y, w, h)
region_widget.show() region_widget.show()
@ -313,8 +172,8 @@ if __name__ == "__main__":
app_instance = QtWidgets.QApplication(sys.argv) app_instance = QtWidgets.QApplication(sys.argv)
region_launcher = RegionLauncher() region_launcher = RegionLauncher()
# Start the config loop in a background thread sio.connect(SERVER_URL)
threading.Thread(target=config_loop, daemon=True).start()
threading.Thread(target=capture_loop, daemon=True).start()
# Enter Qt main event loop
sys.exit(app_instance.exec_()) sys.exit(app_instance.exec_())

View File

@ -1,41 +1,38 @@
import React, { useEffect, useState } from "react"; import React, { useEffect, useState, useRef } from "react";
import { Handle, Position, useReactFlow } from "reactflow"; import { Handle, Position, useReactFlow } from "reactflow";
import { io } from "socket.io-client";
const socket = io();
const APINode = ({ id, data }) => { const APINode = ({ id, data }) => {
const { setNodes } = useReactFlow(); const { setNodes } = useReactFlow();
const [agents, setAgents] = useState([]); const [agents, setAgents] = useState([]);
const [selectedAgent, setSelectedAgent] = useState(data.agent_id || ""); const [selectedAgent, setSelectedAgent] = useState(data.agent_id || "");
const [selectedType, setSelectedType] = useState(data.data_type || "screenshot"); const [selectedType, setSelectedType] = useState(data.data_type || "screenshot");
const [imageData, setImageData] = useState("");
const [intervalMs, setIntervalMs] = useState(data.interval || 1000); const [intervalMs, setIntervalMs] = useState(data.interval || 1000);
const [paused, setPaused] = useState(false); const [paused, setPaused] = useState(false);
const [overlayVisible, setOverlayVisible] = useState(true); const [overlayVisible, setOverlayVisible] = useState(true);
const [imageData, setImageData] = useState("");
// Refresh agents every 5s
useEffect(() => { useEffect(() => {
const fetchAgents = () => fetch("/api/agents").then(res => res.json()).then(setAgents); fetch("/api/agents").then(res => res.json()).then(setAgents);
fetchAgents(); const interval = setInterval(() => {
const interval = setInterval(fetchAgents, 5000); fetch("/api/agents").then(res => res.json()).then(setAgents);
}, 5000);
return () => clearInterval(interval); return () => clearInterval(interval);
}, []); }, []);
// Pull image if agent provisioned
useEffect(() => { useEffect(() => {
if (!selectedAgent || paused) return; socket.on('new_screenshot', (data) => {
const interval = setInterval(() => { if (data.agent_id === selectedAgent) {
fetch(`/api/agent/image?agent_id=${selectedAgent}`) setImageData(data.image_base64);
.then(res => res.json()) window.BorealisValueBus = window.BorealisValueBus || {};
.then(json => { window.BorealisValueBus[id] = data.image_base64;
if (json.image_base64) { }
setImageData(json.image_base64); });
window.BorealisValueBus = window.BorealisValueBus || {};
window.BorealisValueBus[id] = json.image_base64; return () => socket.off('new_screenshot');
} }, [selectedAgent, id]);
})
.catch(() => { });
}, intervalMs);
return () => clearInterval(interval);
}, [selectedAgent, id, paused, intervalMs]);
const provisionAgent = () => { const provisionAgent = () => {
if (!selectedAgent) return; if (!selectedAgent) return;
@ -53,16 +50,12 @@ const APINode = ({ id, data }) => {
task: selectedType task: selectedType
}) })
}).then(() => { }).then(() => {
socket.emit('request_config', { agent_id: selectedAgent });
setNodes(nds => setNodes(nds =>
nds.map(n => n.id === id nds.map(n => n.id === id
? { ? {
...n, ...n,
data: { data: { agent_id: selectedAgent, data_type: selectedType, interval: intervalMs }
...n.data,
agent_id: selectedAgent,
data_type: selectedType,
interval: intervalMs
}
} }
: n : n
) )
@ -73,13 +66,7 @@ const APINode = ({ id, data }) => {
const toggleOverlay = () => { const toggleOverlay = () => {
const newVisibility = !overlayVisible; const newVisibility = !overlayVisible;
setOverlayVisible(newVisibility); setOverlayVisible(newVisibility);
if (selectedAgent) { provisionAgent();
fetch("/api/agent/overlay_visibility", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ agent_id: selectedAgent, visible: newVisibility })
});
}
}; };
return ( return (
@ -154,7 +141,7 @@ const APINode = ({ id, data }) => {
export default { export default {
type: "API_Data_Collector", type: "API_Data_Collector",
label: "API Data Collector", label: "API Data Collector",
description: "Connects to a remote agent via API and collects data such as screenshots, OCR results, and more.", description: "Real-time provisioning and image collection via WebSocket.",
content: "Publishes agent-collected data into the workflow ValueBus.", content: "Publishes real-time agent-collected data into the workflow.",
component: APINode component: APINode
}; };

View File

@ -1,4 +1,5 @@
from flask import Flask, request, jsonify, send_from_directory from flask import Flask, request, jsonify, send_from_directory
from flask_socketio import SocketIO, emit
import time import time
import os import os
@ -10,6 +11,7 @@ if not os.path.exists(build_folder):
print("WARNING: web-interface build folder not found. Please build your React app.") print("WARNING: web-interface build folder not found. Please build your React app.")
app = Flask(__name__, static_folder=build_folder, static_url_path="/") app = Flask(__name__, static_folder=build_folder, static_url_path="/")
socketio = SocketIO(app, cors_allowed_origins="*") # Allow cross-origin WebSocket connections
@app.route("/") @app.route("/")
def serve_index(): def serve_index():
@ -18,7 +20,6 @@ def serve_index():
return send_from_directory(build_folder, "index.html") return send_from_directory(build_folder, "index.html")
return "<h1>Borealis React App Code Not Found</h1><p>Please re-deploy Borealis Workflow Automation Tool</p>", 404 return "<h1>Borealis React App Code Not Found</h1><p>Please re-deploy Borealis Workflow Automation Tool</p>", 404
# Wildcard route to serve React for sub-routes (e.g., /workflow)
@app.route("/<path:path>") @app.route("/<path:path>")
def serve_react_app(path): def serve_react_app(path):
full_path = os.path.join(build_folder, path) full_path = os.path.join(build_folder, path)
@ -27,12 +28,13 @@ def serve_react_app(path):
return send_from_directory(build_folder, "index.html") return send_from_directory(build_folder, "index.html")
# --------------------------------------------- # ---------------------------------------------
# Borealis Agent API Endpoints # Borealis Agent Management (Hybrid: API + WebSockets)
# --------------------------------------------- # ---------------------------------------------
registered_agents = {} registered_agents = {}
agent_configurations = {} agent_configurations = {}
latest_images = {} latest_images = {}
# API Endpoints (kept for provisioning and status)
@app.route("/api/agent/checkin", methods=["POST"]) @app.route("/api/agent/checkin", methods=["POST"])
def agent_checkin(): def agent_checkin():
data = request.json data = request.json
@ -47,68 +49,75 @@ def agent_checkin():
} }
return jsonify({"status": "ok"}) return jsonify({"status": "ok"})
@app.route("/api/agent/reset", methods=["POST"])
def reset_agent():
agent_id = request.json.get("agent_id")
if agent_id in agents:
agents[agent_id]["status"] = "orphaned"
agents[agent_id]["config"] = None
latest_images.pop(agent_id, None)
return jsonify({"status": "reset"}), 200
return jsonify({"error": "Agent not found"}), 404
@app.route("/api/agent/provision", methods=["POST"]) @app.route("/api/agent/provision", methods=["POST"])
def provision_agent(): def provision_agent():
data = request.json data = request.json
agent_id = data.get("agent_id") agent_id = data.get("agent_id")
config = { config = {
"task": "screenshot", "task": data.get("task", "screenshot"),
"x": data.get("x", 100), "x": data.get("x", 100),
"y": data.get("y", 100), "y": data.get("y", 100),
"w": data.get("w", 300), "w": data.get("w", 300),
"h": data.get("h", 200), "h": data.get("h", 200),
"interval": data.get("interval", 1000) "interval": data.get("interval", 1000),
"visible": data.get("visible", True)
} }
agent_configurations[agent_id] = config agent_configurations[agent_id] = config
if agent_id in registered_agents: if agent_id in registered_agents:
registered_agents[agent_id]["status"] = "provisioned" registered_agents[agent_id]["status"] = "provisioned"
return jsonify({"status": "provisioned"}) return jsonify({"status": "provisioned"})
@app.route("/api/agent/config")
def get_agent_config():
agent_id = request.args.get("agent_id")
config = agent_configurations.get(agent_id)
return jsonify(config or {})
@app.route("/api/agent/data", methods=["POST"])
def agent_data():
data = request.json
agent_id = data.get("agent_id")
image = data.get("image_base64")
if not agent_id or not image:
return jsonify({"error": "Missing data"}), 400
latest_images[agent_id] = {
"image_base64": image,
"timestamp": time.time()
}
return jsonify({"status": "received"})
@app.route("/api/agent/image")
def get_latest_image():
agent_id = request.args.get("agent_id")
entry = latest_images.get(agent_id)
if entry:
return jsonify(entry)
return jsonify({"error": "No image"}), 404
@app.route("/api/agents") @app.route("/api/agents")
def get_agents(): def get_agents():
return jsonify(registered_agents) return jsonify(registered_agents)
# WebSocket Handlers
@socketio.on('connect_agent')
def handle_agent_connect(data):
agent_id = data.get('agent_id')
hostname = data.get('hostname', 'unknown')
registered_agents[agent_id] = {
"agent_id": agent_id,
"hostname": hostname,
"last_seen": time.time(),
"status": "connected"
}
print(f"Agent connected: {agent_id}")
emit('agent_connected', {'status': 'connected'})
@socketio.on('screenshot')
def handle_screenshot(data):
agent_id = data.get('agent_id')
image_base64 = data.get('image_base64')
if agent_id and image_base64:
latest_images[agent_id] = {
'image_base64': image_base64,
'timestamp': time.time()
}
# Real-time broadcast to connected dashboards
emit('new_screenshot', {
'agent_id': agent_id,
'image_base64': image_base64
}, broadcast=True)
emit('screenshot_received', {'status': 'ok'})
print(f"Screenshot received from agent: {agent_id}")
else:
emit('error', {'message': 'Invalid screenshot data'})
@socketio.on('request_config')
def handle_request_config(data):
agent_id = data.get('agent_id')
config = agent_configurations.get(agent_id, {})
emit('agent_config', config)
# --------------------------------------------- # ---------------------------------------------
# Server Start # Server Start
# --------------------------------------------- # ---------------------------------------------
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False) socketio.run(app, host="0.0.0.0", port=5000, debug=False)

View File

@ -52,4 +52,43 @@ The launch script will:
## 🧠 How It Works ## 🧠 How It Works
Borealis workflows run on **live data propagation**. Each node checks for incoming values (via edges) and processes them on a recurring timer (default: 200ms). This allows for highly reactive, composable logic graphs. Borealis workflows run on **live data propagation**. Each node checks for incoming values (via edges) and processes them on a recurring timer (default: 200ms). This allows for highly reactive, composable logic graphs.
## ⚡Reverse Proxy Configuration
If you want to run Borealis behind a reverse proxy (e.g., Traefik), you can set up the following dynamic configuration:
```yml
http:
routers:
borealis:
entryPoints:
- websecure
tls:
certResolver: letsencrypt
service: borealis
rule: "Host(`borealis.bunny-lab.io`) && PathPrefix(`/`)"
middlewares:
- cors-headers
middlewares:
cors-headers:
headers:
accessControlAllowOriginList:
- "*"
accessControlAllowMethods:
- GET
- POST
- OPTIONS
accessControlAllowHeaders:
- Content-Type
- Upgrade
- Connection
accessControlMaxAge: 100
addVaryHeader: true
services:
borealis:
loadBalancer:
servers:
- url: "http://192.168.3.254:5000"
passHostHeader: true
```