Resolved User-Created Script Execution Issues

This commit is contained in:
2025-11-13 23:43:00 -07:00
parent d19e494dbc
commit 0f46ec5d69
2 changed files with 88 additions and 9 deletions

View File

@@ -14,8 +14,9 @@ import datetime as _dt
import hashlib
import json
import logging
import re
import uuid
from typing import Any, Dict, List, Mapping, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union
from ...assembly_management.bootstrap import AssemblyCache
from ...assembly_management.models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadType
@@ -80,14 +81,11 @@ class AssemblyRuntimeService:
except Exception:
entries = []
for entry in entries:
metadata = entry.record.metadata or {}
candidate = _normalize_source_path(metadata.get("source_path"))
if not candidate:
continue
if candidate.lower() != lookup_key:
continue
payload_text = self._read_payload_text(entry.record.assembly_guid) if include_payload else None
return self._serialize_entry(entry, include_payload=include_payload, payload_text=payload_text)
for candidate in _iter_source_paths(entry.record):
if candidate.lower() != lookup_key:
continue
payload_text = self._read_payload_text(entry.record.assembly_guid) if include_payload else None
return self._serialize_entry(entry, include_payload=include_payload, payload_text=payload_text)
return None
def export_assembly(self, assembly_guid: str) -> Dict[str, Any]:
@@ -376,6 +374,77 @@ def _normalize_source_path(value: Any) -> str:
return "/".join(segments)
def _iter_source_paths(record: AssemblyRecord) -> Iterable[str]:
"""Yield canonical source paths for the provided assembly record."""
metadata = record.metadata or {}
seen: Set[str] = set()
for key in (
"source_path",
"rel_path",
"legacy_path",
"legacy_rel_path",
"path",
"relative_path",
"script_path",
"playbook_path",
"workflow_path",
):
candidate = _normalize_source_path(metadata.get(key))
if not candidate:
continue
lowered = candidate.lower()
if lowered in seen:
continue
seen.add(lowered)
yield candidate
fallback = _fallback_source_path(record)
if fallback:
lowered = fallback.lower()
if lowered not in seen:
seen.add(lowered)
yield fallback
def _fallback_source_path(record: AssemblyRecord) -> str:
metadata = record.metadata or {}
prefix = _kind_prefix(record.assembly_kind, record.assembly_type)
fallback_name = (
metadata.get("display_name")
or record.display_name
or metadata.get("name")
or record.summary
or record.assembly_guid
or "Assembly"
)
safe_name = _sanitize_name_for_path(fallback_name)
candidate = f"{prefix}/{safe_name}"
return _normalize_source_path(candidate)
def _kind_prefix(kind: Optional[str], assembly_type: Optional[str]) -> str:
key = (kind or "").strip().lower()
type_key = (assembly_type or "").strip().lower()
if key == "ansible" or type_key == "ansible":
return "Ansible_Playbooks"
if key == "workflow" or type_key == "workflow":
return "Workflows"
return "Scripts"
_PATH_SANITIZE_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
def _sanitize_name_for_path(value: Any, fallback: str = "Assembly") -> str:
text = str(value or "").strip()
if not text:
return fallback
sanitized = _PATH_SANITIZE_PATTERN.sub("_", text)
sanitized = sanitized.strip()
return sanitized or fallback
def _serialize_payload(value: Any) -> str:
if isinstance(value, (dict, list)):
return json.dumps(value, indent=2, sort_keys=True)

View File

@@ -150,3 +150,13 @@ def test_cache_flush_waits_for_locked_database(assembly_runtime) -> None:
records = db_manager.load_all(AssemblyDomain.USER)
summaries = {entry.assembly_guid: entry.summary for entry in records}
assert summaries[guid] == "Updated summary after lock."
def test_resolve_document_matches_virtual_path(assembly_runtime) -> None:
service, _cache, _db_manager = assembly_runtime
payload = _script_payload(display_name="User Created Script")
record = service.create_assembly(payload)
resolved = service.resolve_document_by_source_path("Scripts/User_Created_Script")
assert resolved is not None
assert resolved["assembly_guid"] == record["assembly_guid"]