Fixed OS Overwrites in Agent Hearbeats (Legacy Inventory Code removed as well)

This commit is contained in:
2025-11-30 20:28:38 -07:00
parent 1fa84b0e31
commit d746912a96
3 changed files with 456 additions and 105 deletions

View File

@@ -1977,9 +1977,6 @@ def detect_agent_os():
plat = platform.system().lower()
if plat.startswith('win'):
# Aim for: "Microsoft Windows 11 Pro 24H2 Build 26100.5074"
# Pull details from the registry when available and correct
# historical quirks like CurrentVersion reporting 6.3.
try:
import winreg # Only available on Windows
@@ -2002,6 +1999,7 @@ def detect_agent_os():
return default
product_name = _get("ProductName", "") # e.g., "Windows 11 Pro"
installation_type = _get("InstallationType", "") # e.g., "Server"
edition_id = _get("EditionID", "") # e.g., "Professional"
display_version = _get("DisplayVersion", "") # e.g., "24H2" / "22H2"
release_id = _get("ReleaseId", "") # e.g., "2004" on older Windows 10
@@ -2014,19 +2012,113 @@ def detect_agent_os():
build_int = int(str(build_number).split(".")[0]) if build_number else 0
except Exception:
build_int = 0
if build_int >= 22000:
major_label = "11"
elif build_int >= 10240:
major_label = "10"
wmi_info = {}
try:
cmd = "Get-CimInstance Win32_OperatingSystem | Select-Object Caption,ProductType,BuildNumber | ConvertTo-Json -Compress"
out = subprocess.run(
["powershell", "-NoProfile", "-Command", cmd],
capture_output=True,
text=True,
timeout=5,
)
raw = (out.stdout or "").strip()
if raw:
data = json.loads(raw)
if isinstance(data, list):
data = data[0] if data else {}
if isinstance(data, dict):
wmi_info = data
except Exception:
wmi_info = {}
wmi_caption = ""
caption_val = wmi_info.get("Caption")
if isinstance(caption_val, str):
wmi_caption = caption_val.strip()
if wmi_caption.lower().startswith("microsoft "):
wmi_caption = wmi_caption[10:].strip()
def _parse_int(value) -> int:
try:
return int(str(value).split(".")[0])
except Exception:
return 0
if not build_int:
for candidate in (wmi_info.get("BuildNumber"), None):
if candidate:
parsed = _parse_int(candidate)
if parsed:
build_int = parsed
break
if not build_int:
try:
build_int = _parse_int(sys.getwindowsversion().build) # type: ignore[attr-defined]
except Exception:
build_int = 0
product_type_val = wmi_info.get("ProductType")
if isinstance(product_type_val, str):
try:
product_type_val = int(product_type_val.strip())
except Exception:
product_type_val = None
if not isinstance(product_type_val, int):
try:
product_type_val = getattr(sys.getwindowsversion(), 'product_type', None) # type: ignore[attr-defined]
except Exception:
product_type_val = None
if not isinstance(product_type_val, int):
product_type_val = 0
def _contains_server(text) -> bool:
try:
return isinstance(text, str) and 'server' in text.lower()
except Exception:
return False
server_hints = []
if isinstance(product_type_val, int) and product_type_val not in (0, 1):
server_hints.append("product_type")
if isinstance(product_type_val, int) and product_type_val == 1 and _contains_server(product_name):
server_hints.append("product_type_mismatch")
for hint in (product_name, wmi_caption, edition_id, installation_type):
if _contains_server(hint):
server_hints.append("string_hint")
break
if installation_type and str(installation_type).strip().lower() == 'server':
server_hints.append("installation_type")
if isinstance(edition_id, str) and edition_id.lower().startswith('server'):
server_hints.append("edition_id")
if build_int in (20348, 26100, 17763) and _contains_server(product_name or wmi_caption or edition_id or installation_type):
server_hints.append("build_hint")
is_server = bool(server_hints)
if is_server:
if build_int >= 26100:
family = "Windows Server 2025"
elif build_int >= 20348:
family = "Windows Server 2022"
elif build_int >= 17763:
family = "Windows Server 2019"
else:
family = "Windows Server"
else:
major_label = platform.release()
if build_int >= 22000:
family = "Windows 11"
elif build_int >= 10240:
family = "Windows 10"
else:
family = platform.release() or "Windows"
# Derive friendly edition name, prefer parsing from ProductName
edition = ""
pn = product_name or ""
if pn.lower().startswith("windows "):
tokens = pn.split()
# tokens like ["Windows", "11", "Pro", ...]
# tokens like ["Windows", "Server", "2022", "Standard", ...] or ["Windows", "11", "Pro", ...]
if len(tokens) >= 3:
edition = " ".join(tokens[2:])
if not edition and edition_id:
@@ -2047,12 +2139,8 @@ def detect_agent_os():
}
edition = eid_map.get(edition_id, edition_id)
os_name = f"Windows {major_label}"
# Choose version label: DisplayVersion (preferred) then ReleaseId
version_label = display_version or release_id or ""
# Build string with UBR if present
if isinstance(ubr, int):
build_str = f"{build_number}.{ubr}" if build_number else str(ubr)
else:
@@ -2061,7 +2149,7 @@ def detect_agent_os():
except Exception:
build_str = build_number
parts = ["Microsoft", os_name]
parts = ["Microsoft", family]
if edition:
parts.append(edition)
if version_label:
@@ -2069,8 +2157,6 @@ def detect_agent_os():
if build_str:
parts.append(f"Build {build_str}")
# Correct possible mislabeling in ProductName (e.g., says Windows 10 on Win 11)
# by trusting build-based major_label.
return " ".join(p for p in parts if p).strip()
except Exception:
@@ -2123,7 +2209,6 @@ def _system_uptime_seconds() -> Optional[int]:
def _collect_heartbeat_metrics() -> Dict[str, Any]:
metrics: Dict[str, Any] = {
"operating_system": detect_agent_os(),
"service_mode": SERVICE_MODE,
}
uptime = _system_uptime_seconds()
@@ -2450,7 +2535,6 @@ async def send_heartbeat():
payload = {
"guid": client.guid or _read_agent_guid_from_disk(),
"hostname": socket.gethostname(),
"inventory": {},
"metrics": _collect_heartbeat_metrics(),
}
await client.async_post_json("/api/agent/heartbeat", payload, require_auth=True)