Persist assemblies as base64 and decode for execution

This commit is contained in:
2025-10-03 21:16:43 -06:00
parent 304c1e9728
commit 211e37c64c
7 changed files with 370 additions and 21 deletions

View File

@@ -689,6 +689,64 @@ def _empty_assembly_document(default_type: str = "powershell") -> Dict[str, Any]
}
def _decode_base64_text(value: Any) -> Optional[str]:
if not isinstance(value, str):
return None
stripped = value.strip()
if not stripped:
return ""
try:
cleaned = re.sub(r"\s+", "", stripped)
except Exception:
cleaned = stripped
try:
decoded = base64.b64decode(cleaned, validate=True)
except Exception:
return None
try:
return decoded.decode("utf-8")
except Exception:
return decoded.decode("utf-8", errors="replace")
def _decode_script_content(value: Any, encoding_hint: str = "") -> str:
encoding = (encoding_hint or "").strip().lower()
if isinstance(value, str):
if encoding in ("base64", "b64", "base-64"):
decoded = _decode_base64_text(value)
if decoded is not None:
return decoded.replace("\r\n", "\n")
decoded = _decode_base64_text(value)
if decoded is not None:
return decoded.replace("\r\n", "\n")
return value.replace("\r\n", "\n")
return ""
def _encode_script_content(script_text: Any) -> str:
if not isinstance(script_text, str):
if script_text is None:
script_text = ""
else:
script_text = str(script_text)
normalized = script_text.replace("\r\n", "\n")
if not normalized:
return ""
encoded = base64.b64encode(normalized.encode("utf-8"))
return encoded.decode("ascii")
def _prepare_assembly_storage(doc: Dict[str, Any]) -> Dict[str, Any]:
stored: Dict[str, Any] = {}
for key, value in (doc or {}).items():
if key == "script":
stored[key] = _encode_script_content(value)
else:
stored[key] = value
stored["script_encoding"] = "base64"
return stored
def _normalize_assembly_document(obj: Any, default_type: str, base_name: str) -> Dict[str, Any]:
doc = _empty_assembly_document(default_type)
if not isinstance(obj, dict):
@@ -703,6 +761,7 @@ def _normalize_assembly_document(obj: Any, default_type: str, base_name: str) ->
if typ in ("powershell", "batch", "bash", "ansible"):
doc["type"] = typ
script_val = obj.get("script")
content_val = obj.get("content")
script_lines = obj.get("script_lines")
if isinstance(script_lines, list):
try:
@@ -712,11 +771,24 @@ def _normalize_assembly_document(obj: Any, default_type: str, base_name: str) ->
elif isinstance(script_val, str):
doc["script"] = script_val
else:
content_val = obj.get("content")
if isinstance(content_val, str):
doc["script"] = content_val
normalized_script = (doc["script"] or "").replace("\r\n", "\n")
doc["script"] = normalized_script
encoding_hint = str(obj.get("script_encoding") or obj.get("scriptEncoding") or "").strip().lower()
doc["script"] = _decode_script_content(doc.get("script"), encoding_hint)
if encoding_hint in ("base64", "b64", "base-64"):
doc["script_encoding"] = "base64"
else:
probe_source = ""
if isinstance(script_val, str) and script_val:
probe_source = script_val
elif isinstance(content_val, str) and content_val:
probe_source = content_val
decoded_probe = _decode_base64_text(probe_source) if probe_source else None
if decoded_probe is not None:
doc["script_encoding"] = "base64"
doc["script"] = decoded_probe.replace("\r\n", "\n")
else:
doc["script_encoding"] = "plain"
timeout_val = obj.get("timeout_seconds", obj.get("timeout"))
if timeout_val is not None:
try:
@@ -853,7 +925,7 @@ def assembly_create():
base_name,
)
with open(abs_path, "w", encoding="utf-8") as fh:
json.dump(normalized, fh, indent=2)
json.dump(_prepare_assembly_storage(normalized), fh, indent=2)
rel_new = os.path.relpath(abs_path, root).replace(os.sep, "/")
return jsonify({"status": "ok", "rel_path": rel_new})
else:
@@ -902,7 +974,7 @@ def assembly_edit():
base_name,
)
with open(target_abs, "w", encoding="utf-8") as fh:
json.dump(normalized, fh, indent=2)
json.dump(_prepare_assembly_storage(normalized), fh, indent=2)
if target_abs != abs_path:
try:
os.remove(abs_path)
@@ -2993,6 +3065,7 @@ def scripts_quick_run():
env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides)
content = _rewrite_powershell_script(content, literal_lookup)
encoded_content = _encode_script_content(content)
timeout_seconds = 0
try:
timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0))
@@ -3034,7 +3107,8 @@ def scripts_quick_run():
"script_type": script_type,
"script_name": _safe_filename(rel_path),
"script_path": rel_path.replace(os.sep, "/"),
"script_content": content,
"script_content": encoded_content,
"script_encoding": "base64",
"environment": env_map,
"variables": variables,
"timeout_seconds": timeout_seconds,
@@ -3070,6 +3144,7 @@ def ansible_quick_run():
return jsonify({"error": "Playbook not found"}), 404
doc = _load_assembly_document(abs_path, 'ansible')
content = doc.get('script') or ''
encoded_content = _encode_script_content(content)
variables = doc.get('variables') if isinstance(doc.get('variables'), list) else []
files = doc.get('files') if isinstance(doc.get('files'), list) else []
@@ -3112,7 +3187,8 @@ def ansible_quick_run():
"run_id": run_id,
"target_hostname": str(host),
"playbook_name": os.path.basename(abs_path),
"playbook_content": content,
"playbook_content": encoded_content,
"playbook_encoding": "base64",
"connection": "winrm",
"variables": variables,
"files": files,