Initial Device Type Logic

This commit is contained in:
2025-09-05 19:39:11 -06:00
parent 05b2d7ef77
commit 986c9bae7f
4 changed files with 250 additions and 2 deletions

View File

@@ -152,6 +152,237 @@ def detect_agent_os():
return "Unknown"
def _detect_virtual_machine() -> bool:
"""Best-effort detection of whether this system is a virtual machine.
Uses platform-specific signals but avoids heavy dependencies.
"""
try:
plat = platform.system().lower()
if plat == "linux":
# Prefer systemd-detect-virt if available
try:
out = subprocess.run([
"systemd-detect-virt", "--vm"
], capture_output=True, text=True, timeout=3)
if out.returncode == 0 and (out.stdout or "").strip():
return True
except Exception:
pass
# Fallback to DMI sysfs strings
for p in (
"/sys/class/dmi/id/product_name",
"/sys/class/dmi/id/sys_vendor",
"/sys/class/dmi/id/board_vendor",
):
try:
with open(p, "r", encoding="utf-8", errors="ignore") as fh:
s = (fh.read() or "").lower()
if any(k in s for k in (
"kvm", "vmware", "virtualbox", "qemu", "xen", "hyper-v", "microsoft corporation")):
return True
except Exception:
pass
elif plat == "windows":
# Inspect model/manufacturer via CIM
try:
ps_cmd = (
"$cs = Get-CimInstance Win32_ComputerSystem; "
"$model = [string]$cs.Model; $manu = [string]$cs.Manufacturer; "
"Write-Output ($model + '|' + $manu)"
)
out = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True,
text=True,
timeout=6,
)
s = (out.stdout or "").strip().lower()
if any(k in s for k in ("virtual", "vmware", "virtualbox", "kvm", "qemu", "xen", "hyper-v")):
return True
except Exception:
pass
# Fallback: registry BIOS strings often include virtualization hints
try:
import winreg # type: ignore
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"HARDWARE\DESCRIPTION\System") as k:
for name in ("SystemBiosVersion", "VideoBiosVersion"):
try:
val, _ = winreg.QueryValueEx(k, name)
s = str(val).lower()
if any(x in s for x in ("virtual", "vmware", "virtualbox", "qemu", "xen", "hyper-v")):
return True
except Exception:
continue
except Exception:
pass
elif plat == "darwin":
# macOS guest detection is tricky; limited heuristic
try:
out = subprocess.run([
"sysctl", "-n", "machdep.cpu.features"
], capture_output=True, text=True, timeout=3)
s = (out.stdout or "").lower()
if "vmm" in s:
return True
except Exception:
pass
except Exception:
pass
return False
def _detect_device_type_non_vm() -> str:
"""Classify non-VM device as Laptop, Desktop, or Server.
This is intentionally conservative; if unsure, returns 'Desktop'.
"""
try:
plat = platform.system().lower()
if plat == "windows":
# Prefer PCSystemTypeEx, then chassis types, then battery presence
try:
ps_cmd = (
"$cs = Get-CimInstance Win32_ComputerSystem; "
"$typeEx = [int]($cs.PCSystemTypeEx); "
"$type = [int]($cs.PCSystemType); "
"$ch = (Get-CimInstance Win32_SystemEnclosure).ChassisTypes; "
"$hasBatt = @(Get-CimInstance Win32_Battery).Count -gt 0; "
"Write-Output ($typeEx.ToString() + '|' + $type.ToString() + '|' + "
"([string]::Join(',', $ch)) + '|' + $hasBatt)"
)
out = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True,
text=True,
timeout=6,
)
resp = (out.stdout or "").strip()
parts = resp.split("|")
type_ex = int(parts[0]) if len(parts) > 0 and parts[0].isdigit() else None
type_pc = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else None
chassis = []
if len(parts) > 2 and parts[2].strip():
for t in parts[2].split(','):
t = t.strip()
if t.isdigit():
chassis.append(int(t))
has_batt = False
if len(parts) > 3:
has_batt = parts[3].strip().lower() in ("true", "1")
# PCSystemTypeEx mapping per MS docs
if type_ex in (4, 5, 7, 8):
return "Server"
if type_ex == 2:
return "Laptop"
if type_ex in (1, 3):
return "Desktop"
# Fallback to PCSystemType
if type_pc in (4, 5):
return "Server"
if type_pc == 2:
return "Laptop"
if type_pc in (1, 3):
return "Desktop"
# ChassisType mapping (DMTF)
laptop_types = {8, 9, 10, 14, 30, 31}
server_types = {17, 23}
desktop_types = {3, 4, 5, 6, 7, 15, 16, 24, 35}
if any(ct in laptop_types for ct in chassis):
return "Laptop"
if any(ct in server_types for ct in chassis):
return "Server"
if any(ct in desktop_types for ct in chassis):
return "Desktop"
if has_batt:
return "Laptop"
except Exception:
pass
return "Desktop"
if plat == "linux":
# hostnamectl exposes chassis when available
try:
out = subprocess.run(["hostnamectl"], capture_output=True, text=True, timeout=3)
for line in (out.stdout or "").splitlines():
if ":" in line:
k, v = line.split(":", 1)
if k.strip().lower() == "chassis":
val = v.strip().lower()
if "laptop" in val:
return "Laptop"
if "desktop" in val:
return "Desktop"
if "server" in val:
return "Server"
break
except Exception:
pass
# DMI chassis type numeric
try:
with open("/sys/class/dmi/id/chassis_type", "r", encoding="utf-8", errors="ignore") as fh:
s = (fh.read() or "").strip()
ct = int(s)
laptop_types = {8, 9, 10, 14, 30, 31}
server_types = {17, 23}
desktop_types = {3, 4, 5, 6, 7, 15, 16, 24, 35}
if ct in laptop_types:
return "Laptop"
if ct in server_types:
return "Server"
if ct in desktop_types:
return "Desktop"
except Exception:
pass
# Battery presence heuristic
try:
if os.path.isdir("/sys/class/power_supply"):
for name in os.listdir("/sys/class/power_supply"):
if name.lower().startswith("bat"):
return "Laptop"
except Exception:
pass
return "Desktop"
if plat == "darwin":
try:
out = subprocess.run(["sysctl", "-n", "hw.model"], capture_output=True, text=True, timeout=3)
model = (out.stdout or "").strip()
if model:
if model.lower().startswith("macbook"):
return "Laptop"
# iMac, Macmini, MacPro -> treat as Desktop
return "Desktop"
except Exception:
pass
return "Desktop"
except Exception:
pass
return "Desktop"
def detect_device_type() -> str:
"""Return one of: 'Laptop', 'Desktop', 'Server', 'Virtual Machine'."""
try:
if _detect_virtual_machine():
return "Virtual Machine"
return _detect_device_type_non_vm()
except Exception:
return "Desktop"
def _get_internal_ip():
"""Best-effort detection of primary IPv4 address without external reachability.
@@ -325,6 +556,7 @@ def collect_summary(config):
return {
"hostname": socket.gethostname(),
"operating_system": config.data.get("agent_operating_system", detect_agent_os()),
"device_type": config.data.get("device_type", detect_device_type()),
"last_user": last_user,
"internal_ip": _get_internal_ip(),
"external_ip": external_ip,

View File

@@ -211,6 +211,7 @@ export default function DeviceDetails({ device, onBack }) {
const summaryItems = [
{ label: "Device Name", value: summary.hostname || agent.hostname || device?.hostname || "unknown" },
{ label: "Operating System", value: summary.operating_system || agent.agent_operating_system || "unknown" },
{ label: "Device Type", value: summary.device_type || "unknown" },
{ label: "Last User", value: (
<Box sx={{ display: 'flex', alignItems: 'center', gap: 1 }}>
<Box component="span" sx={{

View File

@@ -95,7 +95,7 @@ export default function DeviceList({ onSelectDevice }) {
os: a.agent_operating_system || a.os || "-",
// Enriched fields from details cache
lastUser: details.lastUser || "",
type: "", // Placeholder until provided by backend
type: a.device_type || details.type || "",
created: details.created || "",
createdTs: details.createdTs || 0,
};
@@ -128,9 +128,10 @@ export default function DeviceList({ onSelectDevice }) {
const parsed = Date.parse(createdRaw.replace(" ", "T"));
createdTs = isNaN(parsed) ? 0 : Math.floor(parsed / 1000);
}
const deviceType = (summary.device_type || "").trim();
setDetailsByHost((prev) => ({
...prev,
[h]: { lastUser, created: createdRaw, createdTs },
[h]: { lastUser, created: createdRaw, createdTs, type: deviceType },
}));
} catch {
// ignore per-host failure
@@ -146,6 +147,7 @@ export default function DeviceList({ onSelectDevice }) {
return {
...r,
lastUser: det.lastUser || r.lastUser,
type: det.type || r.type,
created: det.created || r.created,
createdTs: det.createdTs || r.createdTs,
};

View File

@@ -743,6 +743,7 @@ def load_agents_from_db():
"agent_operating_system": summary.get("operating_system")
or summary.get("agent_operating_system")
or "-",
"device_type": summary.get("device_type") or "",
"last_seen": summary.get("last_seen") or 0,
"status": "Offline",
}
@@ -809,6 +810,18 @@ def save_agent_details():
last_seen = (prev_details.get("summary") or {}).get("last_seen")
if last_seen:
incoming_summary["last_seen"] = int(last_seen)
# Refresh server-side cache so /api/agents includes latest OS and device type
try:
if agent_id and agent_id in registered_agents:
rec = registered_agents[agent_id]
os_name = incoming_summary.get("operating_system") or incoming_summary.get("agent_operating_system")
if os_name:
rec["agent_operating_system"] = os_name
dt = (incoming_summary.get("device_type") or "").strip()
if dt:
rec["device_type"] = dt
except Exception:
pass
except Exception:
pass