From b073c31ec39dc0beb75369e88c8645403175351d Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sun, 5 Oct 2025 02:03:37 -0600 Subject: [PATCH] Fix agent hash detection fallback --- Data/Agent/Roles/role_DeviceAudit.py | 133 ++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 12 deletions(-) diff --git a/Data/Agent/Roles/role_DeviceAudit.py b/Data/Agent/Roles/role_DeviceAudit.py index b3b8a4b..513208c 100644 --- a/Data/Agent/Roles/role_DeviceAudit.py +++ b/Data/Agent/Roles/role_DeviceAudit.py @@ -9,6 +9,7 @@ import shutil import string import asyncio from pathlib import Path +from typing import Optional try: import psutil # type: ignore @@ -134,24 +135,132 @@ def _project_root(): return os.getcwd() -_AGENT_HASH_CACHE = {"path": None, "mtime": None, "value": None} +_AGENT_HASH_CACHE = { + "path": None, + "mtime": None, + "value": None, + "source": None, + "extra": None, +} + + +def _iter_hash_roots(): + seen = set() + root = _project_root() + for _ in range(6): + if not root or root in seen: + break + yield root + seen.add(root) + parent = os.path.dirname(root) + if not parent or parent == root: + break + root = parent + + +def _resolve_git_head_hash(root: str) -> Optional[str]: + git_dir = os.path.join(root, ".git") + head_path = os.path.join(git_dir, "HEAD") + if not os.path.isfile(head_path): + return None + try: + with open(head_path, "r", encoding="utf-8") as fh: + head = fh.read().strip() + except Exception: + return None + if not head: + return None + if head.startswith("ref:"): + ref = head.split(" ", 1)[1].strip() if " " in head else head.split(":", 1)[1].strip() + if not ref: + return None + ref_path = os.path.join(git_dir, *ref.split("/")) + if os.path.isfile(ref_path): + try: + with open(ref_path, "r", encoding="utf-8") as rf: + commit = rf.read().strip() + return commit or None + except Exception: + return None + packed_refs = os.path.join(git_dir, "packed-refs") + if os.path.isfile(packed_refs): + try: + with open(packed_refs, "r", encoding="utf-8") as pf: + for line in pf: + line = line.strip() + if not line or line.startswith("#") or line.startswith("^"): + continue + try: + commit, ref_name = line.split(" ", 1) + except ValueError: + continue + if ref_name.strip() == ref: + commit = commit.strip() + return commit or None + except Exception: + return None + return None + # Detached head contains the commit hash directly + commit = head.splitlines()[0].strip() + return commit or None def _read_agent_hash(): try: - root = _project_root() - path = os.path.join(root, 'github_repo_hash.txt') cache = _AGENT_HASH_CACHE - if not os.path.isfile(path): - cache.update({"path": path, "mtime": None, "value": None}) - return None - mtime = os.path.getmtime(path) - if cache.get("path") == path and cache.get("mtime") == mtime: + for root in _iter_hash_roots(): + path = os.path.join(root, 'github_repo_hash.txt') + if not os.path.isfile(path): + continue + mtime = os.path.getmtime(path) + if ( + cache.get("source") == "file" + and cache.get("path") == path + and cache.get("mtime") == mtime + ): + return cache.get("value") + with open(path, 'r', encoding='utf-8') as fh: + value = fh.read().strip() + cache.update( + { + "source": "file", + "path": path, + "mtime": mtime, + "extra": None, + "value": value or None, + } + ) return cache.get("value") - with open(path, 'r', encoding='utf-8') as fh: - value = fh.read().strip() - cache.update({"path": path, "mtime": mtime, "value": value or None}) - return cache.get("value") + + for root in _iter_hash_roots(): + git_dir = os.path.join(root, '.git') + head_path = os.path.join(git_dir, 'HEAD') + if not os.path.isfile(head_path): + continue + head_mtime = os.path.getmtime(head_path) + packed_path = os.path.join(git_dir, 'packed-refs') + packed_mtime = os.path.getmtime(packed_path) if os.path.isfile(packed_path) else None + if ( + cache.get("source") == "git" + and cache.get("path") == head_path + and cache.get("mtime") == head_mtime + and cache.get("extra") == packed_mtime + ): + return cache.get("value") + commit = _resolve_git_head_hash(root) + cache.update( + { + "source": "git", + "path": head_path, + "mtime": head_mtime, + "extra": packed_mtime, + "value": commit or None, + } + ) + if commit: + return commit + cache.update({"source": None, "path": None, "mtime": None, "extra": None, "value": None}) + return None except Exception: try: _AGENT_HASH_CACHE.update({"value": None})