Restructured Scripts and Workflows into "Assemblies"

This commit is contained in:
2025-09-28 23:26:34 -06:00
parent 6c0a01b175
commit 484540b602
15 changed files with 101 additions and 56 deletions

View File

@@ -513,7 +513,7 @@ def move_workflow():
rel_path = (data.get("path") or "").strip()
new_rel = (data.get("new_path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
new_abs = os.path.abspath(os.path.join(workflows_root, new_rel))
@@ -534,7 +534,7 @@ def delete_workflow():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root) or not os.path.isfile(abs_path):
@@ -551,7 +551,7 @@ def delete_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root) or not os.path.isdir(abs_path):
@@ -567,7 +567,7 @@ def create_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
if not abs_path.startswith(workflows_root):
@@ -585,7 +585,7 @@ def rename_folder():
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
if not old_abs.startswith(workflows_root) or not os.path.isdir(old_abs):
@@ -628,11 +628,11 @@ def _extract_tab_name(obj: Dict) -> str:
@app.route("/api/storage/load_workflows", methods=["GET"])
def load_workflows():
"""
Scan <ProjectRoot>/Workflows for *.json files and return a table-friendly list.
Scan <ProjectRoot>/Assemblies/Workflows for *.json files and return a table-friendly list.
"""
# Resolve <ProjectRoot>/Workflows relative to this file at <ProjectRoot>/Data/server.py
# Resolve <ProjectRoot>/Assemblies/Workflows relative to this file at <ProjectRoot>/Data/server.py
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
results: List[Dict] = []
folders: List[str] = []
@@ -695,7 +695,7 @@ def load_workflow():
"""Load a single workflow JSON by its relative path."""
rel_path = request.args.get("path", "")
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
abs_path = os.path.abspath(os.path.join(workflows_root, rel_path))
@@ -716,7 +716,7 @@ def save_workflow():
return jsonify({"error": "Invalid payload"}), 400
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
os.makedirs(workflows_root, exist_ok=True)
@@ -749,7 +749,7 @@ def rename_workflow():
rel_path = (data.get("path") or "").strip()
new_name = (data.get("new_name") or "").strip()
workflows_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Workflows")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies", "Workflows")
)
old_abs = os.path.abspath(os.path.join(workflows_root, rel_path))
if not old_abs.startswith(workflows_root) or not os.path.isfile(old_abs):
@@ -780,10 +780,26 @@ def rename_workflow():
# Scripts Storage API Endpoints
# ---------------------------------------------
def _scripts_root() -> str:
# Scripts live under Assemblies. We unify listing under Assemblies and
# only allow access within top-level folders: "Scripts" and "Ansible Playbooks".
return os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "Scripts")
os.path.join(os.path.dirname(__file__), "..", "..", "Assemblies")
)
def _scripts_allowed_top_levels() -> List[str]:
# Scripts API is scoped strictly to the Scripts top-level.
return ["Scripts"]
def _is_valid_scripts_relpath(rel_path: str) -> bool:
try:
p = (rel_path or "").replace("\\", "/").lstrip("/")
if not p:
return False
top = p.split("/", 1)[0]
return top in _scripts_allowed_top_levels()
except Exception:
return False
def _detect_script_type(filename: str) -> str:
fn = (filename or "").lower()
@@ -813,7 +829,7 @@ def _ext_for_type(script_type: str) -> str:
@app.route("/api/scripts/list", methods=["GET"])
def list_scripts():
"""Scan <ProjectRoot>/Scripts for known script files and return list + folders."""
"""Scan <ProjectRoot>/Assemblies/Scripts for script files and return list + folders."""
scripts_root = _scripts_root()
results: List[Dict] = []
folders: List[str] = []
@@ -826,35 +842,39 @@ def list_scripts():
}), 200
exts = (".yml", ".ps1", ".bat", ".sh")
for root, dirs, files in os.walk(scripts_root):
rel_root = os.path.relpath(root, scripts_root)
if rel_root != ".":
folders.append(rel_root.replace(os.sep, "/"))
for fname in files:
if not fname.lower().endswith(exts):
continue
for top in _scripts_allowed_top_levels():
base_dir = os.path.join(scripts_root, top)
if not os.path.isdir(base_dir):
continue
for root, dirs, files in os.walk(base_dir):
rel_root = os.path.relpath(root, scripts_root)
if rel_root != ".":
folders.append(rel_root.replace(os.sep, "/"))
for fname in files:
if not fname.lower().endswith(exts):
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, scripts_root)
parts = rel_path.split(os.sep)
folder_parts = parts[:-1]
breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else ""
display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, scripts_root)
parts = rel_path.split(os.sep)
folder_parts = parts[:-1]
breadcrumb_prefix = " > ".join(folder_parts) if folder_parts else ""
display_name = f"{breadcrumb_prefix} > {fname}" if breadcrumb_prefix else fname
try:
mtime = os.path.getmtime(full_path)
except Exception:
mtime = 0.0
try:
mtime = os.path.getmtime(full_path)
except Exception:
mtime = 0.0
results.append({
"name": display_name,
"breadcrumb_prefix": breadcrumb_prefix,
"file_name": fname,
"rel_path": rel_path.replace(os.sep, "/"),
"type": _detect_script_type(fname),
"last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)),
"last_edited_epoch": mtime
})
results.append({
"name": display_name,
"breadcrumb_prefix": breadcrumb_prefix,
"file_name": fname,
"rel_path": rel_path.replace(os.sep, "/"),
"type": _detect_script_type(fname),
"last_edited": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(mtime)),
"last_edited_epoch": mtime
})
results.sort(key=lambda x: x.get("last_edited_epoch", 0.0), reverse=True)
@@ -870,7 +890,7 @@ def load_script():
rel_path = request.args.get("path", "")
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
if (not abs_path.startswith(scripts_root)) or (not _is_valid_scripts_relpath(rel_path)) or (not os.path.isfile(abs_path)):
return jsonify({"error": "Script not found"}), 404
try:
with open(abs_path, "r", encoding="utf-8", errors="replace") as fh:
@@ -908,6 +928,8 @@ def save_script():
if desired_ext:
rel_path = base + desired_ext
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not _is_valid_scripts_relpath(rel_path):
return jsonify({"error": "Invalid path (must be under 'Scripts')"}), 400
else:
if not name:
return jsonify({"error": "Missing name"}), 400
@@ -916,7 +938,10 @@ def save_script():
if not ext:
desired_ext = _ext_for_type(script_type) or ".txt"
name = os.path.splitext(name)[0] + desired_ext
abs_path = os.path.abspath(os.path.join(scripts_root, os.path.basename(name)))
# Default top-level folder is Scripts only (Playbooks handled separately)
if (script_type or "").lower() == "ansible":
return jsonify({"error": "Ansible playbooks are managed separately from scripts."}), 400
abs_path = os.path.abspath(os.path.join(scripts_root, "Scripts", os.path.basename(name)))
if not abs_path.startswith(scripts_root):
return jsonify({"error": "Invalid path"}), 400
@@ -967,7 +992,7 @@ def move_script_file():
new_abs = os.path.abspath(os.path.join(scripts_root, new_rel))
if not old_abs.startswith(scripts_root) or not os.path.isfile(old_abs):
return jsonify({"error": "File not found"}), 404
if not new_abs.startswith(scripts_root):
if (not new_abs.startswith(scripts_root)) or (not _is_valid_scripts_relpath(new_rel)):
return jsonify({"error": "Invalid destination"}), 400
os.makedirs(os.path.dirname(new_abs), exist_ok=True)
try:
@@ -983,7 +1008,7 @@ def delete_script_file():
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
if (not abs_path.startswith(scripts_root)) or (not _is_valid_scripts_relpath(rel_path)) or (not os.path.isfile(abs_path)):
return jsonify({"error": "File not found"}), 404
try:
os.remove(abs_path)
@@ -997,6 +1022,10 @@ def scripts_create_folder():
data = request.get_json(silent=True) or {}
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
# If caller provided a path that does not include a valid top-level,
# default to creating under the "Scripts" top-level for convenience.
if not _is_valid_scripts_relpath(rel_path):
rel_path = os.path.join("Scripts", rel_path) if rel_path else "Scripts"
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root):
return jsonify({"error": "Invalid path"}), 400
@@ -1013,8 +1042,11 @@ def scripts_delete_folder():
rel_path = (data.get("path") or "").strip()
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isdir(abs_path):
if (not abs_path.startswith(scripts_root)) or (not _is_valid_scripts_relpath(rel_path)) or (not os.path.isdir(abs_path)):
return jsonify({"error": "Folder not found"}), 404
rel_norm = (rel_path or "").replace("\\", "/").strip("/")
if rel_norm in ("Scripts", "Ansible Playbooks"):
return jsonify({"error": "Cannot delete top-level folder"}), 400
try:
shutil.rmtree(abs_path)
return jsonify({"status": "ok"})
@@ -1033,6 +1065,9 @@ def scripts_rename_folder():
return jsonify({"error": "Folder not found"}), 404
if not new_name:
return jsonify({"error": "Invalid new_name"}), 400
rel_norm = (rel_path or "").replace("\\", "/").strip("/")
if rel_norm in ("Scripts", "Ansible Playbooks"):
return jsonify({"error": "Cannot rename top-level folder"}), 400
new_abs = os.path.join(os.path.dirname(old_abs), new_name)
try:
os.rename(old_abs, new_abs)
@@ -2138,7 +2173,7 @@ def scripts_quick_run():
scripts_root = _scripts_root()
abs_path = os.path.abspath(os.path.join(scripts_root, rel_path))
if not abs_path.startswith(scripts_root) or not os.path.isfile(abs_path):
if (not abs_path.startswith(scripts_root)) or (not _is_valid_scripts_relpath(rel_path)) or (not os.path.isfile(abs_path)):
return jsonify({"error": "Script not found"}), 404
script_type = _detect_script_type(abs_path)