Rewrite PowerShell jobs with substituted variable literals

This commit is contained in:
2025-10-03 19:05:30 -06:00
parent 38863be334
commit 304c1e9728
2 changed files with 204 additions and 77 deletions

View File

@@ -62,6 +62,105 @@ def _expand_env_aliases(env_map: Dict[str, str], variables: List[Dict[str, Any]]
return expanded
def _powershell_literal(value: Any, var_type: str) -> str:
typ = str(var_type or "string").lower()
if typ == "boolean":
if isinstance(value, bool):
truthy = value
elif value is None:
truthy = False
elif isinstance(value, (int, float)):
truthy = value != 0
else:
s = str(value).strip().lower()
if s in {"true", "1", "yes", "y", "on"}:
truthy = True
elif s in {"false", "0", "no", "n", "off", ""}:
truthy = False
else:
truthy = bool(s)
return "$true" if truthy else "$false"
if typ == "number":
if value is None or value == "":
return "0"
return str(value)
s = "" if value is None else str(value)
return "'" + s.replace("'", "''") + "'"
def _extract_variable_default(var: Dict[str, Any]) -> Any:
for key in ("value", "default", "defaultValue", "default_value"):
if key in var:
val = var.get(key)
return "" if val is None else val
return ""
def _prepare_variable_context(doc_variables: List[Dict[str, Any]], overrides: Dict[str, Any]):
env_map: Dict[str, str] = {}
variables: List[Dict[str, Any]] = []
literal_lookup: Dict[str, str] = {}
doc_names: Dict[str, bool] = {}
overrides = overrides or {}
if not isinstance(doc_variables, list):
doc_variables = []
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
doc_names[name] = True
canonical = _canonical_env_key(name)
var_type = str(var.get("type") or "string").lower()
default_val = _extract_variable_default(var)
final_val = overrides[name] if name in overrides else default_val
if canonical:
env_map[canonical] = _env_string(final_val)
literal_lookup[canonical] = _powershell_literal(final_val, var_type)
if name in overrides:
new_var = dict(var)
new_var["value"] = overrides[name]
variables.append(new_var)
else:
variables.append(var)
for name, val in overrides.items():
if name in doc_names:
continue
canonical = _canonical_env_key(name)
if canonical:
env_map[canonical] = _env_string(val)
literal_lookup[canonical] = _powershell_literal(val, "string")
variables.append({"name": name, "value": val, "type": "string"})
env_map = _expand_env_aliases(env_map, variables)
return env_map, variables, literal_lookup
_ENV_VAR_PATTERN = re.compile(r"(?i)\$env:(\{)?([A-Za-z0-9_\-]+)(?(1)\})")
def _rewrite_powershell_script(content: str, literal_lookup: Dict[str, str]) -> str:
if not content or not literal_lookup:
return content
def _replace(match: Any) -> str:
name = match.group(2)
canonical = _canonical_env_key(name)
if not canonical:
return match.group(0)
literal = literal_lookup.get(canonical)
if literal is None:
return match.group(0)
return literal
return _ENV_VAR_PATTERN.sub(_replace, content)
def _parse_ts(val: Any) -> Optional[int]:
"""Best effort to parse ISO-ish datetime string or numeric seconds to epoch seconds."""
if val is None:
@@ -416,47 +515,8 @@ class JobScheduler:
if "value" in var:
overrides[name] = var.get("value")
env_map: Dict[str, str] = {}
doc_names: Dict[str, bool] = {}
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
env_key = _canonical_env_key(name)
if not env_key:
continue
default_val = var.get("default")
if default_val is None and "defaultValue" in var:
default_val = var.get("defaultValue")
if default_val is None and "default_value" in var:
default_val = var.get("default_value")
env_map[env_key] = _env_string(default_val)
doc_names[name] = True
for name, val in overrides.items():
env_key = _canonical_env_key(name)
if not env_key:
continue
env_map[env_key] = _env_string(val)
variables: List[Dict[str, Any]] = []
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
if name in overrides:
new_var = dict(var)
new_var["value"] = overrides[name]
variables.append(new_var)
else:
variables.append(var)
for name, val in overrides.items():
if name not in doc_names:
variables.append({"name": name, "value": val})
env_map = _expand_env_aliases(env_map, variables)
env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides)
content = _rewrite_powershell_script(content, literal_lookup)
timeout_seconds = 0
try:
timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0))

View File

@@ -2845,6 +2845,107 @@ def _expand_env_aliases(env_map: Dict[str, str], variables: List[Dict[str, Any]]
return expanded
def _powershell_literal(value: Any, var_type: str) -> str:
"""Convert a variable value to a PowerShell literal for substitution."""
typ = str(var_type or "string").lower()
if typ == "boolean":
if isinstance(value, bool):
truthy = value
elif value is None:
truthy = False
elif isinstance(value, (int, float)):
truthy = value != 0
else:
s = str(value).strip().lower()
if s in {"true", "1", "yes", "y", "on"}:
truthy = True
elif s in {"false", "0", "no", "n", "off", ""}:
truthy = False
else:
truthy = bool(s)
return "$true" if truthy else "$false"
if typ == "number":
if value is None or value == "":
return "0"
return str(value)
# Treat credentials and any other type as strings
s = "" if value is None else str(value)
return "'" + s.replace("'", "''") + "'"
def _extract_variable_default(var: Dict[str, Any]) -> Any:
for key in ("value", "default", "defaultValue", "default_value"):
if key in var:
val = var.get(key)
return "" if val is None else val
return ""
def _prepare_variable_context(doc_variables: List[Dict[str, Any]], overrides: Dict[str, Any]):
env_map: Dict[str, str] = {}
variables: List[Dict[str, Any]] = []
literal_lookup: Dict[str, str] = {}
doc_names: Dict[str, bool] = {}
overrides = overrides or {}
if not isinstance(doc_variables, list):
doc_variables = []
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
doc_names[name] = True
canonical = _canonical_env_key(name)
var_type = str(var.get("type") or "string").lower()
default_val = _extract_variable_default(var)
final_val = overrides[name] if name in overrides else default_val
if canonical:
env_map[canonical] = _env_string(final_val)
literal_lookup[canonical] = _powershell_literal(final_val, var_type)
if name in overrides:
new_var = dict(var)
new_var["value"] = overrides[name]
variables.append(new_var)
else:
variables.append(var)
for name, val in overrides.items():
if name in doc_names:
continue
canonical = _canonical_env_key(name)
if canonical:
env_map[canonical] = _env_string(val)
literal_lookup[canonical] = _powershell_literal(val, "string")
variables.append({"name": name, "value": val, "type": "string"})
env_map = _expand_env_aliases(env_map, variables)
return env_map, variables, literal_lookup
_ENV_VAR_PATTERN = re.compile(r"(?i)\$env:(\{)?([A-Za-z0-9_\-]+)(?(1)\})")
def _rewrite_powershell_script(content: str, literal_lookup: Dict[str, str]) -> str:
if not content or not literal_lookup:
return content
def _replace(match: Any) -> str:
name = match.group(2)
canonical = _canonical_env_key(name)
if not canonical:
return match.group(0)
literal = literal_lookup.get(canonical)
if literal is None:
return match.group(0)
return literal
return _ENV_VAR_PATTERN.sub(_replace, content)
@app.route("/api/scripts/quick_run", methods=["POST"])
def scripts_quick_run():
"""Queue a Quick Job to agents via WebSocket and record Running status.
@@ -2881,23 +2982,6 @@ def scripts_quick_run():
return ""
return str(value)
env_map: Dict[str, str] = {}
doc_names: Dict[str, bool] = {}
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
env_key = _canonical_env_key(name)
default_val = var.get("default")
if default_val is None and "defaultValue" in var:
default_val = var.get("defaultValue")
if default_val is None and "default_value" in var:
default_val = var.get("default_value")
env_map[env_key] = _env_string(default_val)
doc_names[name] = True
overrides_raw = data.get("variable_values")
overrides: Dict[str, Any] = {}
if isinstance(overrides_raw, dict):
@@ -2906,26 +2990,9 @@ def scripts_quick_run():
if not name:
continue
overrides[name] = val
env_key = _canonical_env_key(name)
env_map[env_key] = _env_string(val)
variables: List[Dict[str, Any]] = []
for var in doc_variables:
if not isinstance(var, dict):
continue
name = str(var.get("name") or "").strip()
if not name:
continue
if name in overrides:
new_var = dict(var)
new_var["value"] = overrides[name]
variables.append(new_var)
else:
variables.append(var)
for name, val in overrides.items():
if name not in doc_names:
variables.append({"name": name, "value": val})
env_map = _expand_env_aliases(env_map, variables)
env_map, variables, literal_lookup = _prepare_variable_context(doc_variables, overrides)
content = _rewrite_powershell_script(content, literal_lookup)
timeout_seconds = 0
try:
timeout_seconds = max(0, int(doc.get("timeout_seconds") or 0))