diff --git a/Data/Agent/Roles/role_ScriptExec_CURRENTUSER.py b/Data/Agent/Roles/role_ScriptExec_CURRENTUSER.py index c31b16ef..4dd83163 100644 --- a/Data/Agent/Roles/role_ScriptExec_CURRENTUSER.py +++ b/Data/Agent/Roles/role_ScriptExec_CURRENTUSER.py @@ -293,56 +293,44 @@ class Role: return job_label = job_id if job_id is not None else 'unknown' _log(f"quick_job_run(currentuser) received payload job_id={job_label}") + context = payload.get('context') if isinstance(payload, dict) else None + + def _result_payload(job_value, status_value, stdout_value="", stderr_value=""): + result = { + 'job_id': job_value, + 'status': status_value, + 'stdout': stdout_value, + 'stderr': stderr_value, + } + if isinstance(context, dict): + result['context'] = context + return result + script_bytes = decode_script_bytes(payload.get('script_content'), payload.get('script_encoding')) if script_bytes is None: _log(f"quick_job_run(currentuser) invalid script payload job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Invalid script payload (unable to decode)', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Invalid script payload (unable to decode)')) return signature_b64 = payload.get('signature') sig_alg = (payload.get('sig_alg') or 'ed25519').lower() signing_key = payload.get('signing_key') if sig_alg and sig_alg not in ('ed25519', 'eddsa'): _log(f"quick_job_run(currentuser) unsupported signature algorithm job_id={job_label} alg={sig_alg}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': f'Unsupported script signature algorithm: {sig_alg}', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', f'Unsupported script signature algorithm: {sig_alg}')) return if not isinstance(signature_b64, str) or not signature_b64.strip(): _log(f"quick_job_run(currentuser) missing signature job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Missing script signature; rejecting payload', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Missing script signature; rejecting payload')) return http_client_fn = getattr(self.ctx, 'hooks', {}).get('http_client') if hasattr(self.ctx, 'hooks') else None client = http_client_fn() if callable(http_client_fn) else None if client is None: _log(f"quick_job_run(currentuser) missing http_client hook job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Signature verification unavailable (client missing)', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Signature verification unavailable (client missing)')) return if not verify_and_store_script_signature(client, script_bytes, signature_b64, signing_key): _log(f"quick_job_run(currentuser) signature verification failed job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Rejected script payload due to invalid signature', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Rejected script payload due to invalid signature')) return _log(f"quick_job_run(currentuser) signature verified job_id={job_label}") content = script_bytes.decode('utf-8', errors='replace') @@ -371,37 +359,26 @@ class Role: except Exception: timeout_seconds = 0 if script_type != 'powershell': - await sio.emit('quick_job_result', { 'job_id': job_id, 'status': 'Failed', 'stdout': '', 'stderr': f"Unsupported type: {script_type}" }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', f'Unsupported type: {script_type}')) return - if run_mode == 'admin': - rc, out, err = -1, '', 'Admin credentialed runs are disabled; use SYSTEM or Current User.' - else: - rc, out, err = await _run_powershell_via_user_task(content, env_map, timeout_seconds) - if rc == -999: - path = _write_temp_script(content, '.ps1', env_map, timeout_seconds) - try: - rc, out, err = await _run_powershell_local(path) - finally: - try: - if path and os.path.isfile(path): - os.remove(path) - except Exception: - pass + + rc, out, err = await _run_powershell_via_user_task(content, env_map, timeout_seconds) + if rc == -999: + rc, out, err = _run_powershell_script_content(content, env_map, timeout_seconds) status = 'Success' if rc == 0 else 'Failed' - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': status, - 'stdout': out, - 'stderr': err, - }) + await sio.emit('quick_job_result', _result_payload(job_id, status, out, err)) except Exception as e: try: - await sio.emit('quick_job_result', { + context = payload.get('context') if isinstance(payload, dict) else None + result = { 'job_id': payload.get('job_id') if isinstance(payload, dict) else None, 'status': 'Failed', 'stdout': '', 'stderr': str(e), - }) + } + if isinstance(context, dict): + result['context'] = context + await sio.emit('quick_job_result', result) except Exception: pass @@ -451,3 +428,5 @@ class Role: QtWidgets.QApplication.instance().quit() except Exception: os._exit(0) + + diff --git a/Data/Agent/Roles/role_ScriptExec_SYSTEM.py b/Data/Agent/Roles/role_ScriptExec_SYSTEM.py index 0c262285..bafdba47 100644 --- a/Data/Agent/Roles/role_ScriptExec_SYSTEM.py +++ b/Data/Agent/Roles/role_ScriptExec_SYSTEM.py @@ -308,56 +308,44 @@ class Role: script_type = (payload.get('script_type') or '').lower() job_label = job_id if job_id is not None else 'unknown' _log(f"quick_job_run(system) received payload job_id={job_label}") + context = payload.get('context') if isinstance(payload, dict) else None + + def _result_payload(job_value, status_value, stdout_value="", stderr_value=""): + result = { + 'job_id': job_value, + 'status': status_value, + 'stdout': stdout_value, + 'stderr': stderr_value, + } + if isinstance(context, dict): + result['context'] = context + return result + script_bytes = decode_script_bytes(payload.get('script_content'), payload.get('script_encoding')) if script_bytes is None: _log(f"quick_job_run(system) invalid script payload job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Invalid script payload (unable to decode)', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Invalid script payload (unable to decode)')) return signature_b64 = payload.get('signature') sig_alg = (payload.get('sig_alg') or 'ed25519').lower() signing_key = payload.get('signing_key') if sig_alg and sig_alg not in ('ed25519', 'eddsa'): _log(f"quick_job_run(system) unsupported signature algorithm job_id={job_label} alg={sig_alg}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': f'Unsupported script signature algorithm: {sig_alg}', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', f'Unsupported script signature algorithm: {sig_alg}')) return if not isinstance(signature_b64, str) or not signature_b64.strip(): _log(f"quick_job_run(system) missing signature job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Missing script signature; rejecting payload', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Missing script signature; rejecting payload')) return http_client_fn = getattr(self.ctx, 'hooks', {}).get('http_client') if hasattr(self.ctx, 'hooks') else None client = http_client_fn() if callable(http_client_fn) else None if client is None: _log(f"quick_job_run(system) missing http_client hook job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Signature verification unavailable (client missing)', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Signature verification unavailable (client missing)')) return if not verify_and_store_script_signature(client, script_bytes, signature_b64, signing_key): _log(f"quick_job_run(system) signature verification failed job_id={job_label}", error=True) - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': 'Rejected script payload due to invalid signature', - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', 'Rejected script payload due to invalid signature')) return _log(f"quick_job_run(system) signature verified job_id={job_label}") content = script_bytes.decode('utf-8', errors='replace') @@ -386,30 +374,26 @@ class Role: except Exception: timeout_seconds = 0 if script_type != 'powershell': - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': 'Failed', - 'stdout': '', - 'stderr': f"Unsupported type: {script_type}" - }) + await sio.emit('quick_job_result', _result_payload(job_id, 'Failed', '', f"Unsupported type: {script_type}")) return rc, out, err = _run_powershell_via_system_task(content, env_map, timeout_seconds) if rc == -999: rc, out, err = _run_powershell_script_content(content, env_map, timeout_seconds) status = 'Success' if rc == 0 else 'Failed' - await sio.emit('quick_job_result', { - 'job_id': job_id, - 'status': status, - 'stdout': out, - 'stderr': err, - }) + await sio.emit('quick_job_result', _result_payload(job_id, status, out, err)) except Exception as e: - try: - await sio.emit('quick_job_result', { - 'job_id': payload.get('job_id') if isinstance(payload, dict) else None, + context = payload.get('context') if isinstance(payload, dict) else None + def _error_payload(job_value, message): + result = { + 'job_id': job_value, 'status': 'Failed', 'stdout': '', - 'stderr': str(e), - }) + 'stderr': message, + } + if isinstance(context, dict): + result['context'] = context + return result + try: + await sio.emit('quick_job_result', _error_payload(payload.get('job_id') if isinstance(payload, dict) else None, str(e))) except Exception: pass diff --git a/Data/Agent/agent.py b/Data/Agent/agent.py index 60d84569..f5cc533c 100644 --- a/Data/Agent/agent.py +++ b/Data/Agent/agent.py @@ -3200,14 +3200,18 @@ if __name__=='__main__': script_bytes = _decode_script_bytes(payload.get('script_content'), encoding_hint) run_mode = (payload.get('run_mode') or 'current_user').lower() _log_agent(f"quick_job_run received payload job_id={job_label} run_mode={run_mode}") + context = payload.get('context') if isinstance(payload, dict) else None if script_bytes is None: err = 'Invalid script payload (unable to decode)' - await sio.emit('quick_job_result', { + result_payload = { 'job_id': job_id, 'status': 'Failed', 'stdout': '', 'stderr': err, - }) + } + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) _log_agent(err) _log_agent(err, fname='agent.error.log') return @@ -3216,35 +3220,44 @@ if __name__=='__main__': signing_key = payload.get('signing_key') if sig_alg and sig_alg not in ('ed25519', 'eddsa'): err = f"Unsupported script signature algorithm: {sig_alg}" - await sio.emit('quick_job_result', { + result_payload = { 'job_id': job_id, 'status': 'Failed', 'stdout': '', 'stderr': err, - }) + } + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) _log_agent(err) _log_agent(err, fname='agent.error.log') return if not isinstance(signature_b64, str) or not signature_b64.strip(): err = 'Missing script signature; rejecting payload' - await sio.emit('quick_job_result', { + result_payload = { 'job_id': job_id, 'status': 'Failed', 'stdout': '', 'stderr': err, - }) + } + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) _log_agent(err) _log_agent(err, fname='agent.error.log') return client = http_client() if not _verify_and_store_script_signature(client, script_bytes, signature_b64, signing_key): err = 'Rejected script payload due to invalid signature' - await sio.emit('quick_job_result', { + result_payload = { 'job_id': job_id, 'status': 'Failed', 'stdout': '', 'stderr': err, - }) + } + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) _log_agent(err) _log_agent(err, fname='agent.error.log') return @@ -3279,21 +3292,28 @@ if __name__=='__main__': # Fallback to plain local run rc, out, err = _run_powershell_script_content_local(content) status = 'Success' if rc == 0 else 'Failed' - await sio.emit('quick_job_result', { + result_payload = { 'job_id': job_id, 'status': status, 'stdout': out or '', 'stderr': err or '', - }) + } + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) _log_agent(f"quick_job_result sent: job_id={job_id} status={status}") except Exception as e: try: - await sio.emit('quick_job_result', { + result_payload = { 'job_id': payload.get('job_id') if isinstance(payload, dict) else None, 'status': 'Failed', 'stdout': '', 'stderr': str(e), - }) + } + context = payload.get('context') if isinstance(payload, dict) else None + if isinstance(context, dict): + result_payload['context'] = context + await sio.emit('quick_job_result', result_payload) except Exception: pass _log_agent(f"quick_job_run handler error: {e}", fname='agent.error.log') diff --git a/Data/Engine/services/API/assemblies/execution.py b/Data/Engine/services/API/assemblies/execution.py index d604b45f..d11a735d 100644 --- a/Data/Engine/services/API/assemblies/execution.py +++ b/Data/Engine/services/API/assemblies/execution.py @@ -27,6 +27,8 @@ if TYPE_CHECKING: # pragma: no cover - typing aide from .. import EngineServiceAdapters +from ...assemblies.service import AssemblyRuntimeService + def _assemblies_root() -> Path: base = Path(__file__).resolve() @@ -255,8 +257,12 @@ def rewrite_powershell_script(content: str, literal_lookup: Dict[str, str]) -> s return _ENV_VAR_PATTERN.sub(_replace, content) -def _load_assembly_document(abs_path: str, default_type: str) -> Dict[str, Any]: - abs_path_str = os.fspath(abs_path) +def _load_assembly_document( + source_identifier: str, + default_type: str, + payload: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + abs_path_str = os.fspath(source_identifier) base_name = os.path.splitext(os.path.basename(abs_path_str))[0] doc: Dict[str, Any] = { "name": base_name, @@ -267,110 +273,114 @@ def _load_assembly_document(abs_path: str, default_type: str) -> Dict[str, Any]: "variables": [], "files": [], "timeout_seconds": 3600, + "metadata": {}, } - if abs_path_str.lower().endswith(".json") and os.path.isfile(abs_path_str): + data: Dict[str, Any] = {} + if isinstance(payload, dict): + data = payload + elif abs_path_str.lower().endswith(".json") and os.path.isfile(abs_path_str): try: with open(abs_path_str, "r", encoding="utf-8") as fh: data = json.load(fh) except Exception: data = {} - if isinstance(data, dict): - doc["name"] = str(data.get("name") or doc["name"]) - doc["description"] = str(data.get("description") or "") - cat = str(data.get("category") or doc["category"]).strip().lower() - if cat in {"application", "script"}: - doc["category"] = cat - typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() - if typ in {"powershell", "batch", "bash", "ansible"}: - doc["type"] = typ - script_val = data.get("script") - content_val = data.get("content") - script_lines = data.get("script_lines") - if isinstance(script_lines, list): - try: - doc["script"] = "\n".join(str(line) for line in script_lines) - except Exception: - doc["script"] = "" - elif isinstance(script_val, str): - doc["script"] = script_val - else: - if isinstance(content_val, str): - doc["script"] = content_val - encoding_hint = str( - data.get("script_encoding") or data.get("scriptEncoding") or "" - ).strip().lower() - doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) - if encoding_hint in {"base64", "b64", "base-64"}: - doc["script_encoding"] = "base64" - else: - probe_source = "" - if isinstance(script_val, str) and script_val: - probe_source = script_val - elif isinstance(content_val, str) and content_val: - probe_source = content_val - decoded_probe = _decode_base64_text(probe_source) if probe_source else None - if decoded_probe is not None: - doc["script_encoding"] = "base64" - doc["script"] = decoded_probe.replace("\r\n", "\n") - else: - doc["script_encoding"] = "plain" + if isinstance(data, dict) and data: + doc["name"] = str(data.get("name") or doc["name"]) + doc["description"] = str(data.get("description") or "") + doc["metadata"] = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + cat = str(data.get("category") or doc["category"]).strip().lower() + if cat in {"application", "script"}: + doc["category"] = cat + typ = str(data.get("type") or data.get("script_type") or default_type).strip().lower() + if typ in {"powershell", "batch", "bash", "ansible"}: + doc["type"] = typ + script_val = data.get("script") + content_val = data.get("content") + script_lines = data.get("script_lines") + if isinstance(script_lines, list): try: - timeout_raw = data.get("timeout_seconds", data.get("timeout")) - if timeout_raw is None: - doc["timeout_seconds"] = 3600 - else: - doc["timeout_seconds"] = max(0, int(timeout_raw)) + doc["script"] = "\n".join(str(line) for line in script_lines) except Exception: + doc["script"] = "" + elif isinstance(script_val, str): + doc["script"] = script_val + elif isinstance(content_val, str): + doc["script"] = content_val + encoding_hint = str(data.get("script_encoding") or data.get("scriptEncoding") or "").strip().lower() + doc["script"] = _decode_script_content(doc.get("script"), encoding_hint) + if encoding_hint in {"base64", "b64", "base-64"}: + doc["script_encoding"] = "base64" + else: + probe_source = "" + if isinstance(script_val, str) and script_val: + probe_source = script_val + elif isinstance(content_val, str) and content_val: + probe_source = content_val + decoded_probe = _decode_base64_text(probe_source) if probe_source else None + if decoded_probe is not None: + doc["script_encoding"] = "base64" + doc["script"] = decoded_probe.replace("\r\n", "\n") + else: + doc["script_encoding"] = "plain" + try: + timeout_raw = data.get("timeout_seconds", data.get("timeout")) + if timeout_raw is None: doc["timeout_seconds"] = 3600 - vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] - doc["variables"] = [] - for item in vars_in: - if not isinstance(item, dict): - continue - name = str(item.get("name") or item.get("key") or "").strip() - if not name: - continue - vtype = str(item.get("type") or "string").strip().lower() - if vtype not in {"string", "number", "boolean", "credential"}: - vtype = "string" - doc["variables"].append( - { - "name": name, - "label": str(item.get("label") or ""), - "type": vtype, - "default": item.get("default", item.get("default_value")), - "required": bool(item.get("required")), - "description": str(item.get("description") or ""), - } - ) - files_in = data.get("files") if isinstance(data.get("files"), list) else [] - doc["files"] = [] - for file_item in files_in: - if not isinstance(file_item, dict): - continue - fname = file_item.get("file_name") or file_item.get("name") - if not fname or not isinstance(file_item.get("data"), str): - continue - try: - size_val = int(file_item.get("size") or 0) - except Exception: - size_val = 0 - doc["files"].append( - { - "file_name": str(fname), - "size": size_val, - "mime_type": str(file_item.get("mime_type") or file_item.get("mimeType") or ""), - "data": file_item.get("data"), - } - ) + else: + doc["timeout_seconds"] = max(0, int(timeout_raw)) + except Exception: + doc["timeout_seconds"] = 3600 + vars_in = data.get("variables") if isinstance(data.get("variables"), list) else [] + doc["variables"] = [] + for item in vars_in: + if not isinstance(item, dict): + continue + name = str(item.get("name") or item.get("key") or "").strip() + if not name: + continue + vtype = str(item.get("type") or "string").strip().lower() + if vtype not in {"string", "number", "boolean", "credential"}: + vtype = "string" + doc["variables"].append( + { + "name": name, + "label": str(item.get("label") or ""), + "type": vtype, + "default": item.get("default", item.get("default_value")), + "required": bool(item.get("required")), + "description": str(item.get("description") or ""), + } + ) + files_in = data.get("files") if isinstance(data.get("files"), list) else [] + doc["files"] = [] + for file_item in files_in: + if not isinstance(file_item, dict): + continue + fname = file_item.get("file_name") or file_item.get("name") + if not fname or not isinstance(file_item.get("data"), str): + continue + try: + size_val = int(file_item.get("size") or 0) + except Exception: + size_val = 0 + doc["files"].append( + { + "file_name": str(fname), + "size": size_val, + "mime_type": str(file_item.get("mime_type") or file_item.get("mimeType") or ""), + "data": file_item.get("data"), + } + ) return doc - try: - with open(abs_path_str, "r", encoding="utf-8", errors="replace") as fh: - content = fh.read() - except Exception: - content = "" - normalized_script = (content or "").replace("\r\n", "\n") - doc["script"] = normalized_script + if os.path.isfile(abs_path_str): + try: + with open(abs_path_str, "r", encoding="utf-8", errors="replace") as fh: + content = fh.read() + except Exception: + content = "" + doc["script"] = (content or "").replace("\r\n", "\n") + else: + doc["script"] = "" return doc @@ -390,6 +400,10 @@ def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: blueprint = Blueprint("assemblies_execution", __name__) service_log = adapters.service_log + assembly_cache = adapters.context.assembly_cache + if assembly_cache is None: + raise RuntimeError("Assembly cache is not initialised; ensure Engine bootstrap executed.") + assembly_runtime = AssemblyRuntimeService(assembly_cache, logger=adapters.context.logger) @blueprint.route("/api/scripts/quick_run", methods=["POST"]) def scripts_quick_run(): @@ -406,34 +420,65 @@ def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: rel_path_canonical = rel_path_normalized + assembly_source = "runtime" + assembly_guid: Optional[str] = None + abs_path_str = rel_path_canonical + doc: Optional[Dict[str, Any]] = None + record: Optional[Dict[str, Any]] = None try: - scripts_root = _scripts_root() - assemblies_root = scripts_root.parent.resolve() - abs_path = (assemblies_root / rel_path_canonical).resolve() - except Exception as exc: # pragma: no cover - defensive guard - service_log( - "assemblies", - f"quick job failed to resolve script path={rel_path_input!r}: {exc}", - level="ERROR", - ) - return jsonify({"error": "Failed to resolve script path"}), 500 + record = assembly_runtime.resolve_document_by_source_path(rel_path_canonical) + except Exception: + record = None + if record: + payload_doc = record.get("payload_json") + if not isinstance(payload_doc, dict): + raw_payload = record.get("payload") + if isinstance(raw_payload, str): + try: + payload_doc = json.loads(raw_payload) + except Exception: + payload_doc = None + if isinstance(payload_doc, dict): + doc = _load_assembly_document(rel_path_canonical, "powershell", payload=payload_doc) + if doc: + metadata_block = doc.get("metadata") if isinstance(doc.get("metadata"), dict) else {} + if isinstance(metadata_block, dict): + assembly_guid = metadata_block.get("assembly_guid") + if not doc.get("name"): + doc["name"] = record.get("display_name") or doc.get("name") + if doc is None: + assembly_source = "filesystem" + try: + scripts_root = _scripts_root() + assemblies_root = scripts_root.parent.resolve() + abs_path = (assemblies_root / rel_path_canonical).resolve() + except Exception as exc: # pragma: no cover - defensive guard + service_log( + "assemblies", + f"quick job failed to resolve script path={rel_path_input!r}: {exc}", + level="ERROR", + ) + return jsonify({"error": "Failed to resolve script path"}), 500 - scripts_root_str = str(scripts_root) - abs_path_str = str(abs_path) - try: - within_scripts = os.path.commonpath([scripts_root_str, abs_path_str]) == scripts_root_str - except ValueError: - within_scripts = False + scripts_root_str = str(scripts_root) + abs_path_str = str(abs_path) + try: + within_scripts = os.path.commonpath([scripts_root_str, abs_path_str]) == scripts_root_str + except ValueError: + within_scripts = False - if not within_scripts or not os.path.isfile(abs_path_str): - service_log( - "assemblies", - f"quick job requested missing or out-of-scope script input={rel_path_input!r} normalized={rel_path_canonical}", - level="WARNING", - ) + if not within_scripts or not os.path.isfile(abs_path_str): + service_log( + "assemblies", + f"quick job requested missing or out-of-scope script input={rel_path_input!r} normalized={rel_path_canonical}", + level="WARNING", + ) + return jsonify({"error": "Script not found"}), 404 + + doc = _load_assembly_document(abs_path_str, "powershell") + if not doc: return jsonify({"error": "Script not found"}), 404 - doc = _load_assembly_document(abs_path, "powershell") script_type = (doc.get("type") or "powershell").lower() if script_type != "powershell": return jsonify({"error": f"Unsupported script type '{script_type}'. Only PowerShell is supported."}), 400 @@ -476,7 +521,9 @@ def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: except Exception: timeout_seconds = 0 - friendly_name = (doc.get("name") or "").strip() or os.path.basename(abs_path) + friendly_name = (doc.get("name") or "").strip() + if not friendly_name: + friendly_name = os.path.basename(rel_path_canonical) now = int(time.time()) results: List[Dict[str, Any]] = [] socketio = getattr(adapters.context, "socketio", None) @@ -528,6 +575,10 @@ def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: payload["sig_alg"] = "ed25519" if signing_key_b64: payload["signing_key"] = signing_key_b64 + context_block = payload.setdefault("context", {}) + context_block["assembly_source"] = assembly_source + if assembly_guid: + context_block["assembly_guid"] = assembly_guid socketio.emit("quick_job_run", payload) try: @@ -546,7 +597,7 @@ def register_execution(app: "Flask", adapters: "EngineServiceAdapters") -> None: results.append({"hostname": host, "job_id": job_id, "status": "Running"}) service_log( "assemblies", - f"quick job queued hostname={host} path={rel_path_canonical} run_mode={run_mode}", + f"quick job queued hostname={host} path={rel_path_canonical} run_mode={run_mode} source={assembly_source}", ) except Exception as exc: if conn is not None: diff --git a/Data/Engine/services/API/scheduled_jobs/management.py b/Data/Engine/services/API/scheduled_jobs/management.py index f1f8e58b..2061333b 100644 --- a/Data/Engine/services/API/scheduled_jobs/management.py +++ b/Data/Engine/services/API/scheduled_jobs/management.py @@ -20,12 +20,15 @@ from __future__ import annotations import time from typing import TYPE_CHECKING, List +from ...assemblies.service import AssemblyRuntimeService from . import job_scheduler if TYPE_CHECKING: # pragma: no cover - typing aide from flask import Flask from .. import EngineServiceAdapters + + def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"): """Instantiate the Engine job scheduler and attach it to the Engine context.""" @@ -36,6 +39,11 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"): if socketio is None: raise RuntimeError("Socket.IO instance is required to initialise the scheduled job service.") + assembly_cache = adapters.context.assembly_cache + if assembly_cache is None: + raise RuntimeError("Assembly cache is required to initialise the scheduled job service.") + assembly_runtime = AssemblyRuntimeService(assembly_cache, logger=adapters.context.logger) + database_path = adapters.context.database_path script_signer = adapters.script_signer @@ -87,6 +95,7 @@ def ensure_scheduler(app: "Flask", adapters: "EngineServiceAdapters"): database_path, script_signer=script_signer, service_logger=adapters.service_log, + assembly_runtime=assembly_runtime, ) job_scheduler.set_online_lookup(scheduler, _online_hostnames_snapshot) scheduler.start() diff --git a/Data/Engine/services/WebSocket/__init__.py b/Data/Engine/services/WebSocket/__init__.py index af235fdd..4bef3597 100644 --- a/Data/Engine/services/WebSocket/__init__.py +++ b/Data/Engine/services/WebSocket/__init__.py @@ -85,6 +85,9 @@ def register_realtime(socket_server: SocketIO, context: EngineContext) -> None: cursor = None broadcast_payload: Optional[Dict[str, Any]] = None + ctx_payload = data.get("context") + context_info: Optional[Dict[str, Any]] = ctx_payload if isinstance(ctx_payload, dict) else None + try: conn = adapters.db_conn_factory() cursor = conn.cursor() @@ -105,10 +108,30 @@ def register_realtime(socket_server: SocketIO, context: EngineContext) -> None: except sqlite3.Error: link = None + run_id: Optional[int] = None + scheduled_ts_ctx: Optional[int] = None if link: try: run_id = int(link[0]) - ts_now = _now_ts() + except Exception: + run_id = None + + if run_id is None and context_info: + ctx_run = context_info.get("scheduled_job_run_id") or context_info.get("run_id") + try: + if ctx_run is not None: + run_id = int(ctx_run) + except (TypeError, ValueError): + run_id = None + try: + if context_info.get("scheduled_ts") is not None: + scheduled_ts_ctx = int(context_info.get("scheduled_ts")) + except (TypeError, ValueError): + scheduled_ts_ctx = None + + if run_id is not None: + ts_now = _now_ts() + try: if status.lower() == "running": cursor.execute( "UPDATE scheduled_job_runs SET status='Running', updated_at=? WHERE id=?", @@ -125,13 +148,29 @@ def register_realtime(socket_server: SocketIO, context: EngineContext) -> None: """, (status, ts_now, ts_now, run_id), ) + if scheduled_ts_ctx is not None: + cursor.execute( + "UPDATE scheduled_job_runs SET scheduled_ts=COALESCE(scheduled_ts, ?) WHERE id=?", + (scheduled_ts_ctx, run_id), + ) conn.commit() + adapters.service_log( + "scheduled_jobs", + f"scheduled run update run_id={run_id} activity_id={job_id} status={status}", + ) except Exception as exc: # pragma: no cover - defensive guard logger.debug( - "quick_job_result failed to update scheduled_job_runs for job_id=%s: %s", + "quick_job_result failed to update scheduled_job_runs for job_id=%s run_id=%s: %s", job_id, + run_id, exc, ) + elif context_info: + adapters.service_log( + "scheduled_jobs", + f"scheduled run update skipped (no run_id) activity_id={job_id} status={status} context={context_info}", + level="WARNING", + ) try: cursor.execute( diff --git a/Data/Engine/services/assemblies/service.py b/Data/Engine/services/assemblies/service.py index 33099147..38e818a9 100644 --- a/Data/Engine/services/assemblies/service.py +++ b/Data/Engine/services/assemblies/service.py @@ -63,6 +63,33 @@ class AssemblyRuntimeService: data = self._serialize_entry(entry, include_payload=True, payload_text=payload_text) return data + def resolve_document_by_source_path( + self, + source_path: str, + *, + include_payload: bool = True, + ) -> Optional[Dict[str, Any]]: + """Return an assembly record whose metadata source_path matches the provided value.""" + + normalized = _normalize_source_path(source_path) + if not normalized: + return None + lookup_key = normalized.lower() + try: + entries = self._cache.list_entries() + except Exception: + entries = [] + for entry in entries: + metadata = entry.record.metadata or {} + candidate = _normalize_source_path(metadata.get("source_path")) + if not candidate: + continue + if candidate.lower() != lookup_key: + continue + payload_text = self._read_payload_text(entry.record.assembly_guid) if include_payload else None + return self._serialize_entry(entry, include_payload=include_payload, payload_text=payload_text) + return None + def export_assembly(self, assembly_guid: str) -> Dict[str, Any]: entry = self._cache.get_entry(assembly_guid) if not entry: @@ -328,6 +355,27 @@ def _payload_type_from_kind(kind: str) -> PayloadType: return PayloadType.UNKNOWN +def _normalize_source_path(value: Any) -> str: + """Normalise metadata source_path for comparison.""" + + if value is None: + return "" + text = str(value).replace("\\", "/").strip() + if not text: + return "" + segments = [] + for part in text.split("/"): + candidate = part.strip() + if not candidate or candidate == ".": + continue + if candidate == "..": + return "" + segments.append(candidate) + if not segments: + return "" + return "/".join(segments) + + def _serialize_payload(value: Any) -> str: if isinstance(value, (dict, list)): return json.dumps(value, indent=2, sort_keys=True)