Files
Borealis-Github-Replica/Data/Server/server.py

719 lines
25 KiB
Python

#////////// PROJECT FILE SEPARATION LINE ////////// CODE AFTER THIS LINE ARE FROM: <ProjectRoot>/Data/Server/server.py
import eventlet
# Monkey-patch stdlib for cooperative sockets
eventlet.monkey_patch()
import requests
from flask import Flask, request, jsonify, Response, send_from_directory, make_response
from flask_socketio import SocketIO, emit
from flask_cors import CORS
import time
import os # To Read Production ReactJS Server Folder
import json # For reading workflow JSON files
import shutil # For moving workflow files and folders
from typing import List, Dict
import sqlite3
# Borealis Python API Endpoints
from Python_API_Endpoints.ocr_engines import run_ocr_on_base64
# ---------------------------------------------
# Flask + WebSocket Server Configuration
# ---------------------------------------------
app = Flask(
__name__,
static_folder=os.path.join(os.path.dirname(__file__), '../web-interface/build'),
static_url_path=''
)
# Enable CORS on All Routes
CORS(app)
socketio = SocketIO(
app,
cors_allowed_origins="*",
async_mode="eventlet",
engineio_options={
'max_http_buffer_size': 100_000_000,
'max_websocket_message_size': 100_000_000
}
)
# ---------------------------------------------
# Serve ReactJS Production Vite Build from dist/
# ---------------------------------------------
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve_dist(path):
full_path = os.path.join(app.static_folder, path)
if path and os.path.isfile(full_path):
return send_from_directory(app.static_folder, path)
else:
# SPA entry point
return send_from_directory(app.static_folder, 'index.html')
# ---------------------------------------------
# Health Check Endpoint
# ---------------------------------------------
@app.route("/health")
def health():
return jsonify({"status": "ok"})
# ---------------------------------------------
# User Authentication Endpoint
# ---------------------------------------------
@app.route("/api/users", methods=["GET"])
def get_users():
users_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "users.json")
)
try:
with open(users_path, "r", encoding="utf-8") as fh:
return jsonify(json.load(fh))
except Exception:
return jsonify({"users": []})
# ---------------------------------------------
# Borealis Python API Endpoints
# ---------------------------------------------
# /api/ocr: Accepts a base64 image and OCR engine selection,
# and returns extracted text lines.
@app.route("/api/ocr", methods=["POST"])
def ocr_endpoint():
payload = request.get_json()
image_b64 = payload.get("image_base64")
engine = payload.get("engine", "tesseract").lower().strip()
backend = payload.get("backend", "cpu").lower().strip()
if engine in ["tesseractocr", "tesseract"]:
engine = "tesseract"
elif engine == "easyocr":
engine = "easyocr"
else:
return jsonify({"error": f"OCR engine '{engine}' not recognized."}), 400
try:
lines = run_ocr_on_base64(image_b64, engine=engine, backend=backend)
return jsonify({"lines": lines})
except Exception as e:
return jsonify({"error": str(e)}), 500
# New storage management endpoints
@app.route("/api/storage/move_workflow", methods=["POST"])
def move_workflow():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_rel = (data.get("new_path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
new_abs = os.path.abspath(os.path.join(workflows_root, new_rel))
if not old_abs.startswith(workflows_root) or not os.path.isfile(old_abs):
return jsonify({"error": "Workflow not found"}), 404
if not new_abs.startswith(workflows_root):
return jsonify({"error": "Invalid destination"}), 400
os.makedirs(os.path.dirname(new_abs), exist_ok=True)
try:
shutil.move(old_abs, new_abs)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/storage/delete_workflow", methods=["POST"])
def delete_workflow():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root) or not os.path.isfile(abs_path):
return jsonify({"error": "Workflow not found"}), 404
try:
os.remove(abs_path)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/storage/delete_folder", methods=["POST"])
def delete_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root) or not os.path.isdir(abs_path):
return jsonify({"error": "Folder not found"}), 404
try:
shutil.rmtree(abs_path)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/storage/create_folder", methods=["POST"])
def create_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root):
return jsonify({"error": "Invalid path"}), 400
try:
os.makedirs(abs_path, exist_ok=True)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/storage/rename_folder", methods=["POST"])
def rename_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
if not old_abs.startswith(workflows_root) or not os.path.isdir(old_abs):
return jsonify({"error": "Folder not found"}), 404
if not new_name:
return jsonify({"error": "Invalid new_name"}), 400
new_abs = os.path.join(os.path.dirname(old_abs), new_name)
try:
os.rename(old_abs, new_abs)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---------------------------------------------
# Borealis Storage API Endpoints
# ---------------------------------------------
def _safe_read_json(path: str) -> Dict:
"""
Try to read JSON safely. Returns {} on failure.
"""
try:
with open(path, "r", encoding="utf-8") as fh:
return json.load(fh)
except Exception:
return {}
def _extract_tab_name(obj: Dict) -> str:
"""
Best-effort extraction of a workflow tab name from a JSON object.
Falls back to empty string when unknown.
"""
if not isinstance(obj, dict):
return ""
for key in ["tabName", "tab_name", "name", "title"]:
val = obj.get(key)
if isinstance(val, str) and val.strip():
return val.strip()
return ""
@app.route("/api/storage/load_workflows", methods=["GET"])
def load_workflows():
"""
Scan <ProjectRoot>/Workflows for *.json files and return a table-friendly list.
"""
# Resolve <ProjectRoot>/Workflows relative to this file at <ProjectRoot>/Data/server.py
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
results: List[Dict] = []
folders: List[str] = []
if not os.path.isdir(workflows_root):
return jsonify({
"root": workflows_root,
"workflows": [],
"warning": "Workflows directory not found."
}), 200
for root, dirs, files in os.walk(workflows_root):
rel_root = os.path.relpath(root, workflows_root)
if rel_root != ".":
folders.append(rel_root.replace(os.sep, "/"))
for fname in files:
if not fname.lower().endswith(".json"):
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, workflows_root)
parts = rel_path.split(os.sep)
folder_parts = parts[:-1]
breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else ""
display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname
obj = _safe_read_json(full_path)
tab_name = _extract_tab_name(obj)
try:
mtime = os.path.getmtime(full_path)
except Exception:
mtime = 0.0
last_edited_str = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime))
results.append({
"name": display_name,
"breadcrumb_prefix": breadcrumb_prefix,
"file_name": fname,
"rel_path": rel_path.replace(os.sep, "/"),
"tab_name": tab_name,
"description": "",
"category": "",
"last_edited": last_edited_str,
"last_edited_epoch": mtime
})
results.sort(key=lambda x: x.get("last_edited_epoch", 0.0), reverse=True)
return jsonify({
"root": workflows_root,
"workflows": results,
"folders": folders
})
@app.route("/api/storage/load_workflow", methods=["GET"])
def load_workflow():
"""Load a single workflow JSON by its relative path."""
rel_path = request.args.get("path", "")
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root) or not os.path.isfile(abs_path):
return jsonify({"error": "Workflow not found"}), 404
obj = _safe_read_json(abs_path)
return jsonify(obj)
@app.route("/api/storage/save_workflow", methods=["POST"])
def save_workflow():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
name = (data.get("name") or "").strip()
workflow = data.get("workflow")
if not isinstance(workflow, dict):
return jsonify({"error": "Invalid payload"}), 400
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
os.makedirs(workflows_root, exist_ok=True)
if rel_path:
if not rel_path.lower().endswith(".json"):
rel_path += ".json"
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
else:
if not name:
return jsonify({"error": "Invalid payload"}), 400
if not name.lower().endswith(".json"):
name += ".json"
abs_path = os.path.abspath(os.path.join(workflows_root, os.path.basename(name)))
if not abs_path.startswith(workflows_root):
return jsonify({"error": "Invalid path"}), 400
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
try:
with open(abs_path, "w", encoding="utf-8") as fh:
json.dump(workflow, fh, indent=2)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/storage/rename_workflow", methods=["POST"])
def rename_workflow():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
if not old_abs.startswith(workflows_root) or not os.path.isfile(old_abs):
return jsonify({"error": "Workflow not found"}), 404
if not new_name:
return jsonify({"error": "Invalid new_name"}), 400
if not new_name.lower().endswith(".json"):
new_name += ".json"
new_abs = os.path.join(os.path.dirname(old_abs), os.path.basename(new_name))
base_name = os.path.splitext(os.path.basename(new_abs))[0]
try:
os.rename(old_abs, new_abs)
obj = _safe_read_json(new_abs)
for k in ["tabName", "tab_name", "name", "title"]:
if k in obj:
obj[k] = base_name
if "tab_name" not in obj:
obj["tab_name"] = base_name
with open(new_abs, "w", encoding="utf-8") as fh:
json.dump(obj, fh, indent=2)
rel_new = os.path.relpath(new_abs, workflows_root).replace(os.sep, "/")
return jsonify({"status": "ok", "rel_path": rel_new})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---------------------------------------------
# Borealis Agent API Endpoints
# ---------------------------------------------
# These endpoints handle agent registration, provisioning, image streaming, and heartbeats.
# Shape expected by UI for each agent:
# { "agent_id", "hostname", "agent_operating_system", "last_seen", "status" }
registered_agents: Dict[str, Dict] = {}
agent_configurations: Dict[str, Dict] = {}
latest_images: Dict[str, Dict] = {}
# Device database initialization
DB_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Databases", "devices.db")
)
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
def init_db():
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS device_details (hostname TEXT PRIMARY KEY, description TEXT, details TEXT)"
)
conn.commit()
conn.close()
init_db()
@app.route("/api/agents")
def get_agents():
"""
Return a dict keyed by agent_id with hostname, os, last_seen, status.
"""
return jsonify(registered_agents)
@app.route("/api/agent/details", methods=["POST"])
def save_agent_details():
data = request.get_json(silent=True) or {}
hostname = data.get("hostname")
details = data.get("details")
if not hostname and isinstance(details, dict):
hostname = details.get("summary", {}).get("hostname")
if not hostname or not isinstance(details, dict):
return jsonify({"error": "invalid payload"}), 400
try:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(
"SELECT description FROM device_details WHERE hostname = ?",
(hostname,),
)
row = cur.fetchone()
description = row[0] if row else ""
cur.execute(
"REPLACE INTO device_details (hostname, description, details) VALUES (?, ?, ?)",
(hostname, description, json.dumps(details)),
)
conn.commit()
conn.close()
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/device/details/<hostname>", methods=["GET"])
def get_device_details(hostname: str):
try:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(
"SELECT details, description FROM device_details WHERE hostname = ?",
(hostname,),
)
row = cur.fetchone()
conn.close()
if row:
try:
details = json.loads(row[0])
except Exception:
details = {}
description = row[1] if len(row) > 1 else ""
if description:
details.setdefault("summary", {})["description"] = description
return jsonify(details)
except Exception:
pass
return jsonify({})
@app.route("/api/device/description/<hostname>", methods=["POST"])
def set_device_description(hostname: str):
data = request.get_json(silent=True) or {}
description = (data.get("description") or "").strip()
try:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(
"INSERT INTO device_details(hostname, description, details) VALUES (?, ?, COALESCE((SELECT details FROM device_details WHERE hostname = ?), '{}')) "
"ON CONFLICT(hostname) DO UPDATE SET description=excluded.description",
(hostname, description, hostname),
)
conn.commit()
conn.close()
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/agent/<agent_id>", methods=["DELETE"])
def delete_agent(agent_id: str):
"""Remove an agent from the in-memory registry."""
if agent_id in registered_agents:
registered_agents.pop(agent_id, None)
agent_configurations.pop(agent_id, None)
return jsonify({"status": "removed"})
return jsonify({"error": "agent not found"}), 404
@app.route("/api/agent/provision", methods=["POST"])
def provision_agent():
data = request.json
agent_id = data.get("agent_id")
roles = data.get("roles", [])
if not agent_id or not isinstance(roles, list):
return jsonify({"error": "Missing agent_id or roles[] in provision payload."}), 400
config = {"roles": roles}
agent_configurations[agent_id] = config
if agent_id in registered_agents:
registered_agents[agent_id]["status"] = "provisioned"
socketio.emit("agent_config", config)
return jsonify({"status": "provisioned", "roles": roles})
# ---------------------------------------------
# Borealis External API Proxy Endpoint
# ---------------------------------------------
@app.route("/api/proxy", methods=["GET", "POST", "OPTIONS"])
def proxy():
target = request.args.get("url")
if not target:
return {"error": "Missing ?url="}, 400
upstream = requests.request(
method = request.method,
url = target,
headers = {k:v for k,v in request.headers if k.lower() != "host"},
data = request.get_data(),
timeout = 10
)
excluded = ["content-encoding","content-length","transfer-encoding","connection"]
headers = [(k,v) for k,v in upstream.raw.headers.items() if k.lower() not in excluded]
resp = make_response(upstream.content, upstream.status_code)
for k,v in headers:
resp.headers[k] = v
return resp
# ---------------------------------------------
# Live Screenshot Viewer for Debugging
# ---------------------------------------------
@app.route("/api/agent/<agent_id>/node/<node_id>/screenshot/live")
def screenshot_node_viewer(agent_id, node_id):
return f"""
<!DOCTYPE html>
<html>
<head>
<title>Borealis Live View - {agent_id}:{node_id}</title>
<style>
body {{
margin: 0;
background-color: #000;
display: flex;
align-items: center;
justify-content: center;
height: 100vh;
}}
canvas {{
border: 1px solid #444;
max-width: 90vw;
max-height: 90vh;
background-color: #111;
}}
</style>
</head>
<body>
<canvas id="viewerCanvas"></canvas>
<script src="https://cdn.socket.io/4.8.1/socket.io.min.js"></script>
<script>
const agentId = "{agent_id}";
const nodeId = "{node_id}";
const canvas = document.getElementById("viewerCanvas");
const ctx = canvas.getContext("2d");
const socket = io(window.location.origin, {{ transports: ["websocket"] }});
socket.on("agent_screenshot_task", (data) => {{
if (data.agent_id !== agentId || data.node_id !== nodeId) return;
const base64 = data.image_base64;
if (!base64 || base64.length < 100) return;
const img = new Image();
img.onload = () => {{
if (canvas.width !== img.width || canvas.height !== img.height) {{
canvas.width = img.width;
canvas.height = img.height;
}}
ctx.clearRect(0, 0, canvas.width, canvas.height);
ctx.drawImage(img, 0, 0);
}};
img.src = "data:image/png;base64," + base64;
}});
</script>
</body>
</html>
"""
# ---------------------------------------------
# WebSocket Events for Real-Time Communication
# ---------------------------------------------
@socketio.on("agent_screenshot_task")
def receive_screenshot_task(data):
agent_id = data.get("agent_id")
node_id = data.get("node_id")
image = data.get("image_base64", "")
if not agent_id or not node_id:
print("[WS] Screenshot task missing agent_id or node_id.")
return
if image:
latest_images[f"{agent_id}:{node_id}"] = {
"image_base64": image,
"timestamp": time.time()
}
emit("agent_screenshot_task", data, broadcast=True)
@socketio.on("connect_agent")
def connect_agent(data):
"""
Initial agent connect. Agent may only send agent_id here.
Hostname/OS are filled in by subsequent heartbeats.
"""
agent_id = (data or {}).get("agent_id")
if not agent_id:
return
print(f"Agent connected: {agent_id}")
rec = registered_agents.setdefault(agent_id, {})
rec["agent_id"] = agent_id
rec["hostname"] = rec.get("hostname", "unknown")
rec["agent_operating_system"] = rec.get("agent_operating_system", "-")
rec["last_seen"] = int(time.time())
rec["status"] = "provisioned" if agent_id in agent_configurations else "orphaned"
@socketio.on("agent_heartbeat")
def on_agent_heartbeat(data):
"""
Heartbeat payload from agent:
{ agent_id, hostname, agent_operating_system, last_seen }
Updates registry so Devices view can display OS/hostname and recency.
"""
if not data:
return
agent_id = data.get("agent_id")
if not agent_id:
return
hostname = data.get("hostname")
if hostname:
for aid, info in list(registered_agents.items()):
if aid != agent_id and info.get("hostname") == hostname:
registered_agents.pop(aid, None)
agent_configurations.pop(aid, None)
rec = registered_agents.setdefault(agent_id, {})
rec["agent_id"] = agent_id
if hostname:
rec["hostname"] = hostname
if data.get("agent_operating_system"):
rec["agent_operating_system"] = data.get("agent_operating_system")
rec["last_seen"] = int(data.get("last_seen") or time.time())
rec["status"] = "provisioned" if agent_id in agent_configurations else rec.get("status", "orphaned")
@socketio.on("request_config")
def send_agent_config(data):
agent_id = (data or {}).get("agent_id")
config = agent_configurations.get(agent_id)
if config:
emit("agent_config", config)
@socketio.on("screenshot")
def receive_screenshot(data):
agent_id = data.get("agent_id")
image = data.get("image_base64")
if agent_id and image:
latest_images[agent_id] = {
"image_base64": image,
"timestamp": time.time()
}
emit("new_screenshot", {"agent_id": agent_id, "image_base64": image}, broadcast=True)
@socketio.on("disconnect")
def on_disconnect():
print("[WebSocket] Connection Disconnected")
# Macro Websocket Handlers
@socketio.on("macro_status")
def receive_macro_status(data):
"""
Receives macro status/errors from agent and relays to all clients
Expected payload: {
"agent_id": ...,
"node_id": ...,
"success": True/False,
"message": "...",
"timestamp": ...
}
"""
print(f"[Macro Status] Agent {data.get('agent_id')} Node {data.get('node_id')} Success: {data.get('success')} Msg: {data.get('message')}")
emit("macro_status", data, broadcast=True)
@socketio.on("list_agent_windows")
def handle_list_agent_windows(data):
"""
Forwards list_agent_windows event to all agents (or filter for a specific agent_id).
"""
emit("list_agent_windows", data, broadcast=True)
@socketio.on("agent_window_list")
def handle_agent_window_list(data):
"""
Relay the list of windows from the agent back to all connected clients.
"""
emit("agent_window_list", data, broadcast=True)
# ---------------------------------------------
# Server Launch
# ---------------------------------------------
if __name__ == "__main__":
# Use SocketIO runner so WebSocket transport works with eventlet.
socketio.run(app, host="0.0.0.0", port=5000)