Initial Script Editing Functionality

This commit is contained in:
2025-09-03 03:03:27 -06:00
parent 5bc6e1104a
commit 0d5ba0056f
3 changed files with 956 additions and 2 deletions

View File

@@ -15,6 +15,7 @@ import json # For reading workflow JSON files
import shutil # For moving workflow files and folders
from typing import List, Dict
import sqlite3
import io
# Borealis Python API Endpoints
from Python_API_Endpoints.ocr_engines import run_ocr_on_base64
@@ -371,6 +372,266 @@ def rename_workflow():
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---------------------------------------------
# Scripts Storage API Endpoints
# ---------------------------------------------
def _scripts_root() -> str:
return os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Scripts")
)
def _detect_script_type(filename: str) -> str:
fn = (filename or "").lower()
if fn.endswith(".yml"):
return "ansible"
if fn.endswith(".ps1"):
return "powershell"
if fn.endswith(".bat"):
return "batch"
if fn.endswith(".sh"):
return "bash"
return "unknown"
def _ext_for_type(script_type: str) -> str:
t = (script_type or "").lower()
if t == "ansible":
return ".yml"
if t == "powershell":
return ".ps1"
if t == "batch":
return ".bat"
if t == "bash":
return ".sh"
return ""
@app.route("/api/scripts/list", methods=["GET"])
def list_scripts():
"""Scan <ProjectRoot>/Scripts for known script files and return list + folders."""
scripts_root = _scripts_root()
results: List[Dict] = []
folders: List[str] = []
if not os.path.isdir(scripts_root):
return jsonify({
"root": scripts_root,
"scripts": [],
"folders": []
}), 200
exts = (".yml", ".ps1", ".bat", ".sh")
for root, dirs, files in os.walk(scripts_root):
rel_root = os.path.relpath(root, scripts_root)
if rel_root != ".":
folders.append(rel_root.replace(os.sep, "/"))
for fname in files:
if not fname.lower().endswith(exts):
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, scripts_root)
parts = rel_path.split(os.sep)
folder_parts = parts[:-1]
breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else ""
display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname
try:
mtime = os.path.getmtime(full_path)
except Exception:
mtime = 0.0
results.append({
"name": display_name,
"breadcrumb_prefix": breadcrumb_prefix,
"file_name": fname,
"rel_path": rel_path.replace(os.sep, "/"),
"type": _detect_script_type(fname),
"last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)),
"last_edited_epoch": mtime
})
results.sort(key=lambda x: x.get("last_edited_epoch", 0.0), reverse=True)
return jsonify({
"root": scripts_root,
"scripts": results,
"folders": folders
})
@app.route("/api/scripts/load", methods=["GET"])
def load_script():
rel_path = request.args.get("path", "")
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
return jsonify({"error": "Script not found"}), 404
try:
with open(abs_path, "r", encoding="utf-8", errors="replace") as fh:
content = fh.read()
return jsonify({
"file_name": os.path.basename(abs_path),
"rel_path": os.path.relpath(abs_path, scripts_root).replace(os.sep, "/"),
"type": _detect_script_type(abs_path),
"content": content
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/save", methods=["POST"])
def save_script():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
name = (data.get("name") or "").strip()
content = data.get("content")
script_type = (data.get("type") or "").strip().lower()
if content is None:
return jsonify({"error": "Missing content"}), 400
scripts_root = _scripts_root()
os.makedirs(scripts_root, exist_ok=True)
# Determine target path
if rel_path:
# Ensure extension matches type if provided
base, ext = os.path.splitext(rel_path)
desired_ext = _ext_for_type(script_type) or ext
if desired_ext and not rel_path.lower().endswith(desired_ext):
rel_path = base + desired_ext
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
else:
if not name:
return jsonify({"error": "Missing name"}), 400
desired_ext = _ext_for_type(script_type) or os.path.splitext(name)[1] or ".txt"
if not name.lower().endswith(desired_ext):
name = os.path.splitext(name)[0] + desired_ext
abs_path = os.path.abspath(os.path.join(scripts_root, os.path.basename(name)))
if not abs_path.startswith(scripts_root):
return jsonify({"error": "Invalid path"}), 400
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
try:
with open(abs_path, "w", encoding="utf-8", newline="\n") as fh:
fh.write(str(content))
rel_new = os.path.relpath(abs_path, scripts_root).replace(os.sep, "/")
return jsonify({"status": "ok", "rel_path": rel_new})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/rename_file", methods=["POST"])
def rename_script_file():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
script_type = (data.get("type") or "").strip().lower()
scripts_root = _scripts_root()
old_abs = os.path.abspath(os.path.join(scripts_root, rel_path))
if not old_abs.startswith(scripts_root) or not os.path.isfile(old_abs):
return jsonify({"error": "File not found"}), 404
if not new_name:
return jsonify({"error": "Invalid new_name"}), 400
desired_ext = _ext_for_type(script_type) or os.path.splitext(new_name)[1]
if desired_ext and not new_name.lower().endswith(desired_ext):
new_name = os.path.splitext(new_name)[0] + desired_ext
new_abs = os.path.join(os.path.dirname(old_abs), os.path.basename(new_name))
try:
os.rename(old_abs, new_abs)
rel_new = os.path.relpath(new_abs, scripts_root).replace(os.sep, "/")
return jsonify({"status": "ok", "rel_path": rel_new})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/move_file", methods=["POST"])
def move_script_file():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_rel = (data.get("new_path") or "").strip()
scripts_root = _scripts_root()
old_abs = os.path.abspath(os.path.join(scripts_root, rel_path))
new_abs = os.path.abspath(os.path.join(scripts_root, new_rel))
if not old_abs.startswith(scripts_root) or not os.path.isfile(old_abs):
return jsonify({"error": "File not found"}), 404
if not new_abs.startswith(scripts_root):
return jsonify({"error": "Invalid destination"}), 400
os.makedirs(os.path.dirname(new_abs), exist_ok=True)
try:
shutil.move(old_abs, new_abs)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/delete_file", methods=["POST"])
def delete_script_file():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
return jsonify({"error": "File not found"}), 404
try:
os.remove(abs_path)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/create_folder", methods=["POST"])
def scripts_create_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root):
return jsonify({"error": "Invalid path"}), 400
try:
os.makedirs(abs_path, exist_ok=True)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/delete_folder", methods=["POST"])
def scripts_delete_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isdir(abs_path):
return jsonify({"error": "Folder not found"}), 404
try:
shutil.rmtree(abs_path)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/rename_folder", methods=["POST"])
def scripts_rename_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
scripts_root = _scripts_root()
old_abs = os.path.abspath(os.path.join(scripts_root, rel_path))
if not old_abs.startswith(scripts_root) or not os.path.isdir(old_abs):
return jsonify({"error": "Folder not found"}), 404
if not new_name:
return jsonify({"error": "Invalid new_name"}), 400
new_abs = os.path.join(os.path.dirname(old_abs), new_name)
try:
os.rename(old_abs, new_abs)
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---------------------------------------------
# Borealis Agent API Endpoints
# ---------------------------------------------