import os import sys import json import time import socket import platform import subprocess import shutil import string import asyncio import re from pathlib import Path try: import psutil # type: ignore except Exception: psutil = None try: import aiohttp except Exception: aiohttp = None ROLE_NAME = 'device_audit' ROLE_CONTEXTS = ['system'] IS_WINDOWS = os.name == 'nt' def detect_agent_os(): try: plat = platform.system().lower() if plat.startswith('win'): try: import winreg # type: ignore reg_path = r"SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion" access = getattr(winreg, 'KEY_READ', 0x20019) try: access |= winreg.KEY_WOW64_64KEY except Exception: pass try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0, access) except OSError: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0, winreg.KEY_READ) def _get(name, default=None): try: return winreg.QueryValueEx(key, name)[0] except Exception: return default product_name = _get("ProductName", "") display_version = _get("DisplayVersion", "") release_id = _get("ReleaseId", "") build_number = _get("CurrentBuildNumber", "") or _get("CurrentBuild", "") ubr = _get("UBR", None) edition_id = _get("EditionID", "") wmi_info = {} try: cmd = "Get-CimInstance Win32_OperatingSystem | Select-Object Caption,ProductType,BuildNumber | ConvertTo-Json -Compress" out = subprocess.run( ["powershell", "-NoProfile", "-Command", cmd], capture_output=True, text=True, timeout=5, ) raw = (out.stdout or "").strip() if raw: data = json.loads(raw) if isinstance(data, list): data = data[0] if data else {} if isinstance(data, dict): wmi_info = data except Exception: wmi_info = {} wmi_caption = "" caption_val = wmi_info.get("Caption") if isinstance(caption_val, str): wmi_caption = caption_val.strip() if wmi_caption.lower().startswith("microsoft "): wmi_caption = wmi_caption[10:].strip() def _parse_int(value) -> int: try: return int(str(value).split(".")[0]) except Exception: return 0 build_int = 0 for candidate in (build_number, wmi_info.get("BuildNumber")): if candidate: parsed = _parse_int(candidate) if parsed: build_int = parsed break if not build_int: try: build_int = _parse_int(sys.getwindowsversion().build) # type: ignore[attr-defined] except Exception: build_int = 0 product_type_val = wmi_info.get("ProductType") if isinstance(product_type_val, str): try: product_type_val = int(product_type_val.strip()) except Exception: product_type_val = None if not isinstance(product_type_val, int): try: product_type_val = getattr(sys.getwindowsversion(), 'product_type', None) # type: ignore[attr-defined] except Exception: product_type_val = None if not isinstance(product_type_val, int): product_type_val = 0 is_server = False if product_type_val not in (0, 1): is_server = True elif product_type_val == 1: is_server = False else: if isinstance(product_name, str) and 'server' in product_name.lower(): is_server = True elif wmi_caption and 'server' in wmi_caption.lower(): is_server = True if is_server: if build_int >= 26100: family = "Windows Server 2025" elif build_int >= 20348: family = "Windows Server 2022" elif build_int >= 17763: family = "Windows Server 2019" else: family = "Windows Server" else: family = "Windows 11" if build_int >= 22000 else "Windows 10" if not family: family = (product_name or wmi_caption or "Windows").strip() def _extract_edition(source: str) -> str: if not isinstance(source, str): return "" text = source.strip() if not text: return "" lower = text.lower() if lower.startswith("microsoft "): text = text[len("Microsoft "):].strip() lower = text.lower() fam_words = family.split() source_words = text.split() i = 0 while i < len(fam_words) and i < len(source_words): if fam_words[i].lower() != source_words[i].lower(): break i += 1 if i < len(fam_words): return "" if i >= len(source_words): return "" suffix = " ".join(source_words[i:]).strip() if suffix.startswith("-"): suffix = suffix[1:].strip() return suffix def _edition_from_id(value: str, drop_server: bool) -> str: if not isinstance(value, str): return "" text = value.replace("_", " ") text = re.sub(r"(? start: return json.loads(txt[start:end+1]) except Exception: pass return None except Exception: return None def collect_software(): plat = platform.system().lower() if plat != 'windows': return [] # 1) Try PowerShell registry scrape (fast when ConvertTo-Json is available) try: ps = r""" $paths = @( 'HKLM:\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*', 'HKLM:\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\*', 'HKCU:\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*' ) $list = @() foreach ($p in $paths) { try { $list += Get-ItemProperty -Path $p -ErrorAction SilentlyContinue | Select-Object DisplayName, DisplayVersion } catch {} } $list = $list | Where-Object { $_.DisplayName -and ("$($_.DisplayName)".Trim().Length -gt 0) } $list | Sort-Object DisplayName -Unique | ConvertTo-Json -Depth 2 """ data = _ps_json(ps, timeout=120) out = [] if isinstance(data, dict): data = [data] for it in (data or []): name = str(it.get('DisplayName') or '').strip() if not name: continue ver = str(it.get('DisplayVersion') or '').strip() out.append({'name': name, 'version': ver}) if out: return out except Exception: pass # 2) Fallback: read registry directly via Python winreg (works on Win7+) try: try: import winreg # type: ignore except Exception: return [] def _enum_uninstall(root, path, wow_flag=0): items = [] try: key = winreg.OpenKey(root, path, 0, winreg.KEY_READ | wow_flag) except Exception: return items try: i = 0 while True: try: sub = winreg.EnumKey(key, i) except OSError: break i += 1 try: sk = winreg.OpenKey(key, sub, 0, winreg.KEY_READ | wow_flag) try: name, _ = winreg.QueryValueEx(sk, 'DisplayName') except Exception: name = '' if name and str(name).strip(): try: ver, _ = winreg.QueryValueEx(sk, 'DisplayVersion') except Exception: ver = '' items.append({'name': str(name).strip(), 'version': str(ver or '').strip()}) except Exception: continue except Exception: pass return items HKLM = getattr(winreg, 'HKEY_LOCAL_MACHINE') HKCU = getattr(winreg, 'HKEY_CURRENT_USER') WOW64_64 = getattr(winreg, 'KEY_WOW64_64KEY', 0) WOW64_32 = getattr(winreg, 'KEY_WOW64_32KEY', 0) paths = [ (HKLM, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall", WOW64_64), (HKLM, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall", WOW64_32), (HKLM, r"SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall", 0), (HKCU, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall", 0), ] merged = {} for root, path, flag in paths: for it in _enum_uninstall(root, path, flag): key = (it['name'] or '').lower() if not key: continue if key not in merged: merged[key] = it return sorted(merged.values(), key=lambda x: x['name']) except Exception: return [] def collect_memory(): entries = [] try: plat = platform.system().lower() if plat == 'windows': try: ps_cmd = ( "Get-CimInstance Win32_PhysicalMemory | " "Select-Object BankLabel,Speed,SerialNumber,Capacity | ConvertTo-Json" ) out = subprocess.run(["powershell", "-NoProfile", "-Command", ps_cmd], capture_output=True, text=True, timeout=60) data = json.loads(out.stdout or "[]") if isinstance(data, dict): data = [data] for stick in data: entries.append({ 'slot': stick.get('BankLabel', 'unknown'), 'speed': str(stick.get('Speed', 'unknown')), 'serial': stick.get('SerialNumber', 'unknown'), 'capacity': stick.get('Capacity', 'unknown'), }) except Exception: pass except Exception: pass if not entries and psutil: try: vm = psutil.virtual_memory() entries.append({'slot': 'physical', 'speed': 'unknown', 'serial': 'unknown', 'capacity': vm.total}) except Exception: pass return entries def collect_storage(): disks = [] try: if psutil: for part in psutil.disk_partitions(): try: usage = psutil.disk_usage(part.mountpoint) except Exception: continue disks.append({ 'drive': part.device, 'disk_type': 'Removable' if isinstance(part.opts, str) and 'removable' in part.opts.lower() else 'Fixed Disk', 'usage': usage.percent, 'total': usage.total, 'free': usage.free, 'used': usage.used, }) else: # Fallback basic detection on Windows via drive letters if IS_WINDOWS: for letter in string.ascii_uppercase: drive = f"{letter}:\\" if os.path.exists(drive): try: usage = shutil.disk_usage(drive) except Exception: continue disks.append({ 'drive': drive, 'disk_type': 'Fixed Disk', 'usage': (usage.used / usage.total * 100) if usage.total else 0, 'total': usage.total, 'free': usage.free, 'used': usage.used, }) except Exception: pass return disks def collect_network(): adapters = [] try: if IS_WINDOWS: # Try modern Get-NetIPAddress; fallback to ipconfig parsing (Win7) try: ps_cmd = ( "try { " "$ip = Get-NetIPAddress -AddressFamily IPv4 -ErrorAction Stop | " "Where-Object { $_.IPAddress -and $_.IPAddress -notmatch '^169\\.254\\.' -and $_.IPAddress -ne '127.0.0.1' }; " "$ad = Get-NetAdapter | ForEach-Object { $_ | Select-Object -Property InterfaceAlias, MacAddress, LinkSpeed }; " "$map = @{}; foreach($a in $ad){ $map[$a.InterfaceAlias] = @{ Mac=$a.MacAddress; LinkSpeed=('' + $a.LinkSpeed).Trim() } }; " "$out = @(); foreach($e in $ip){ $m = $map[$e.InterfaceAlias]; $mac = $m.Mac; $ls = $m.LinkSpeed; $out += [pscustomobject]@{ InterfaceAlias=$e.InterfaceAlias; IPAddress=$e.IPAddress; MacAddress=$mac; LinkSpeed=$ls } } " "$out | ConvertTo-Json -Depth 3 } catch { '' }" ) data = _ps_json(ps_cmd, timeout=60) if isinstance(data, dict): data = [data] tmp = {} for e in (data or []): alias = e.get('InterfaceAlias') or 'unknown' ip = e.get('IPAddress') or '' mac = e.get('MacAddress') or 'unknown' link = e.get('LinkSpeed') or '' if not ip: continue item = tmp.setdefault(alias, {'adapter': alias, 'ips': [], 'mac': mac, 'link_speed': link}) if ip not in item['ips']: item['ips'].append(ip) if tmp: adapters = list(tmp.values()) else: raise Exception('empty') except Exception: # Win7/older fallback: parse ipconfig try: out = subprocess.run(["ipconfig"], capture_output=True, text=True, timeout=30) cur = None for line in (out.stdout or '').splitlines(): s = line.strip() if not s: continue if s.endswith(":") and ('adapter' in s.lower() or 'ethernet' in s.lower() or 'wireless' in s.lower()): cur = {'adapter': s.replace(':','').strip(), 'ips': [], 'mac': 'unknown'} adapters.append(cur) continue if s.lower().startswith('ipv4 address') or s.lower().startswith('ipv4-adresse') or 'ipv4' in s.lower(): try: ip = s.split(':')[-1].strip() except Exception: ip = '' if ip and not ip.startswith('169.254.') and ip != '127.0.0.1' and cur: cur['ips'].append(ip) if s.lower().startswith('physical address') or s.lower().startswith('mac address'): try: mac = s.split(':')[-1].strip() except Exception: mac = '' if mac and cur: cur['mac'] = mac except Exception: pass else: out = subprocess.run(["ip", "-o", "-4", "addr", "show"], capture_output=True, text=True, timeout=60) for line in out.stdout.splitlines(): parts = line.split() if len(parts) >= 4: name = parts[1] ip = parts[3].split("/")[0] adapters.append({'adapter': name, 'ips': [ip], 'mac': 'unknown'}) except Exception: pass return adapters def collect_cpu() -> dict: """Collect CPU model, cores, and base clock (best-effort cross-platform).""" out: dict = {} try: plat = platform.system().lower() if plat == 'windows': try: ps_cmd = ( "Get-CimInstance Win32_Processor | " "Select-Object Name, NumberOfCores, NumberOfLogicalProcessors, MaxClockSpeed | ConvertTo-Json" ) data = _ps_json(ps_cmd, timeout=15) if isinstance(data, dict): data = [data] name = '' phys = 0 logi = 0 mhz = 0 for idx, cpu in enumerate(data or []): if idx == 0: name = str(cpu.get('Name') or '') try: mhz = int(cpu.get('MaxClockSpeed') or 0) except Exception: mhz = 0 try: phys += int(cpu.get('NumberOfCores') or 0) except Exception: pass try: logi += int(cpu.get('NumberOfLogicalProcessors') or 0) except Exception: pass out = { 'name': name.strip(), 'physical_cores': phys or None, 'logical_cores': logi or None, 'base_clock_ghz': (float(mhz) / 1000.0) if mhz else None, } return out except Exception: pass elif plat == 'darwin': try: name = subprocess.run(["sysctl", "-n", "machdep.cpu.brand_string"], capture_output=True, text=True, timeout=5).stdout.strip() except Exception: name = '' try: cores = int(subprocess.run(["sysctl", "-n", "hw.ncpu"], capture_output=True, text=True, timeout=5).stdout.strip() or '0') except Exception: cores = 0 out = {'name': name, 'logical_cores': cores or None} return out else: # Linux try: brand = '' cores = 0 with open('/proc/cpuinfo', 'r', encoding='utf-8', errors='ignore') as fh: for line in fh: if not brand and 'model name' in line: brand = line.split(':', 1)[-1].strip() if 'processor' in line: cores += 1 out = {'name': brand, 'logical_cores': cores or None} return out except Exception: pass except Exception: pass # psutil fallback try: if psutil: return { 'name': platform.processor() or '', 'physical_cores': psutil.cpu_count(logical=False) if hasattr(psutil, 'cpu_count') else None, 'logical_cores': psutil.cpu_count(logical=True) if hasattr(psutil, 'cpu_count') else None, } except Exception: pass return out or {} def detect_device_type(): try: plat = platform.system().lower() if plat != 'windows': return '' ps = r""" function _getCim($cls){ try { return Get-CimInstance $cls -ErrorAction Stop } catch { try { return Get-WmiObject -Class $cls -ErrorAction Stop } catch { return $null } } } $os = _getCim 'Win32_OperatingSystem' $cs = _getCim 'Win32_ComputerSystem' $caption = ""; if ($os) { $caption = [string]$os.Caption } $model = ""; if ($cs) { $model = [string]$cs.Model } $manu = ""; if ($cs) { $manu = [string]$cs.Manufacturer } $virt = $false if ($model -match 'Virtual' -or $manu -match 'Microsoft Corporation' -and $model -match 'Virtual Machine' -or $manu -match 'VMware' -or $manu -match 'innotek' -or $manu -match 'VirtualBox' -or $manu -match 'QEMU' -or $manu -match 'Xen' -or $manu -match 'Parallels') { $virt = $true } if ($virt) { 'Virtual Machine' } elseif ($caption -match 'Server') { 'Server' } else { 'Workstation' } """ out = subprocess.run(["powershell", "-NoProfile", "-Command", ps], capture_output=True, text=True, timeout=15) s = (out.stdout or '').strip() return s.splitlines()[0].strip() if s else '' except Exception: return '' def _collect_last_user_registry() -> str: if not IS_WINDOWS: return '' # Registry-first approach: LogonUI LastLoggedOnSAMUser / LastLoggedOnUser try: ps = r""" $ErrorActionPreference = 'SilentlyContinue' function Normalize-Sam([string]$s) { if ([string]::IsNullOrWhiteSpace($s)) { return '' } if ($s -match '\$$') { return '' } if ($s -like 'DWM-*' -or $s -like 'UMFD-*') { return '' } if ($s -eq 'SYSTEM' -or $s -eq 'LOCAL SERVICE' -or $s -eq 'NETWORK SERVICE' -or $s -eq 'ANONYMOUS LOGON') { return '' } return $s } $regPath = 'HKLM:\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Authentication\\LogonUI' $sam = ''; $upn = '' try { $sam = (Get-ItemProperty -Path $regPath -Name 'LastLoggedOnSAMUser' -ErrorAction Stop).LastLoggedOnSAMUser } catch {} try { $upn = (Get-ItemProperty -Path $regPath -Name 'LastLoggedOnUser' -ErrorAction Stop).LastLoggedOnUser } catch {} $user = Normalize-Sam $sam if (-not $user) { $user = Normalize-Sam $upn if ($user -and $user -like '*@*') { $domDns = (Get-WmiObject Win32_ComputerSystem).Domain $domShort = '' if ($domDns) { $domShort = ($domDns -split '\\.')[0].ToUpper() } $parts = $user -split '@' if ($parts.Length -ge 1) { $u = $parts[0] if ($domShort) { $user = "$domShort\\$u" } } } } if ($user) { $user } else { '' } """ out = subprocess.run(["powershell", "-NoProfile", "-Command", ps], capture_output=True, text=True, timeout=10) s = (out.stdout or '').strip() if s: return s.splitlines()[0].strip() except Exception: pass # Fallback to Python winreg lookup try: import winreg # type: ignore key_path = r"SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Authentication\\LogonUI" def _qval(name): try: k = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path, 0, winreg.KEY_READ | getattr(winreg, 'KEY_WOW64_64KEY', 0)) try: val, _ = winreg.QueryValueEx(k, name) finally: winreg.CloseKey(k) return str(val or '').strip() except Exception: return '' sam = _qval('LastLoggedOnSAMUser') upn = _qval('LastLoggedOnUser') def _ok(s: str) -> bool: if not s: return False su = s.upper() return not (s.endswith('$') or su in ('SYSTEM','LOCAL SERVICE','NETWORK SERVICE','ANONYMOUS LOGON') or s.startswith('DWM-') or s.startswith('UMFD-')) if _ok(sam): return sam if _ok(upn): if '@' in upn: try: user, dom = upn.split('@', 1) dom_short = (dom.split('.')[0] or dom).upper() return f"{dom_short}\\{user}" except Exception: pass return upn except Exception: pass return '' def _collect_last_user_string() -> str: if not IS_WINDOWS: return '' try: ps = r""" $ErrorActionPreference = 'SilentlyContinue' function Get-InteractiveUsers { $users = @() try { $ls = Get-CimInstance Win32_LogonSession | Where-Object { $_.LogonType -in 2,10 } foreach ($sess in $ls) { $accs = Get-CimAssociatedInstance -InputObject $sess -Association Win32_LoggedOnUser -ResultClassName Win32_Account foreach ($a in $accs) { if (-not $a -or -not $a.Name) { continue } $name = [string]$a.Name $domain = [string]$a.Domain if ($name -match '\$$') { continue } if ($domain -eq 'NT AUTHORITY' -or $domain -eq 'NT SERVICE') { continue } if ($name -like 'DWM-*' -or $name -like 'UMFD-*') { continue } if ($domain) { $users += ("{0}\{1}" -f $domain,$name) } else { $users += $name } } } } catch {} $users | Sort-Object -Unique } function Get-QuserUsers { $list=@() try { $q = (quser 2>$null) -split '\r?\n' foreach ($line in $q) { if (-not $line) { continue } if ($line -match '^USERNAME') { continue } $s = ($line -replace '^>','').Trim() if (-not $s) { continue } $parts = $s -split '\s+' if ($parts.Length -lt 1) { continue } $u = $parts[0] if (-not $u) { continue } if ($u -match '\$$') { continue } if ($u -like 'DWM-*' -or $u -like 'UMFD-*') { continue } $list += $u } } catch {} $list | Sort-Object -Unique } $u1 = Get-InteractiveUsers $u2 = Get-QuserUsers $combined = @() foreach ($u in $u1) { if ($combined -notcontains $u) { $combined += $u } } foreach ($u in $u2) { if ($combined -notcontains $u) { $combined += $u } } if ($combined.Count -eq 0) { 'No Users Logged In' } else { $combined -join ', ' } """ out = subprocess.run(["powershell", "-NoProfile", "-Command", ps], capture_output=True, text=True, timeout=25) s = (out.stdout or '').strip() # PowerShell may emit newlines; take first non-empty line result if s: for line in s.splitlines(): t = line.strip() if t: return t return '' except Exception: return '' def _build_details_fallback() -> dict: # Construct a details object similar to Ansible playbook output try: summary = collect_summary(type('C', (), {'data': {}, '_write': lambda s: None})()) except Exception: summary = {'hostname': socket.gethostname()} # Normalize OS field if summary.get('os') and not summary.get('operating_system'): summary['operating_system'] = summary.get('os') # Last reboot in UTC string try: if psutil and hasattr(psutil, 'boot_time'): from datetime import datetime, timezone summary['last_reboot'] = datetime.fromtimestamp(psutil.boot_time(), timezone.utc).strftime('%Y-%m-%d %H:%M:%S') except Exception: pass # Device type try: dt = detect_device_type() if dt: summary['device_type'] = dt except Exception: pass # Network network = collect_network() try: # Derive internal_ip from first private IPv4 def is_private(ip: str) -> bool: return ip.startswith('10.') or ip.startswith('192.168.') or (ip.startswith('172.') and any(ip.startswith(f'172.{n}.') for n in list(range(16,32)))) ips = [] for a in network: for ip in (a.get('ips') or []): if ip and is_private(ip): ips.append(ip) summary['internal_ip'] = ips[0] if ips else (network[0]['ips'][0] if network and network[0].get('ips') else '') except Exception: pass # External IP best-effort try: ext = '' for url in ('https://api.ipify.org', 'https://checkip.amazonaws.com'): try: import urllib.request with urllib.request.urlopen(url, timeout=3) as resp: txt = (resp.read() or b'').decode('utf-8', errors='ignore').strip() if txt and '\n' in txt: txt = txt.split('\n', 1)[0].strip() if '{' in txt: try: obj = json.loads(txt); txt = (obj.get('ip') or '').strip() except Exception: pass if txt: ext = txt; break except Exception: continue if ext: summary['external_ip'] = ext except Exception: pass # Last user(s) try: last_user = _collect_last_user_registry() if last_user: summary['last_user'] = last_user except Exception: pass # CPU information (summary + display string) try: cpu = collect_cpu() if cpu: summary['cpu'] = cpu cores = cpu.get('logical_cores') or cpu.get('physical_cores') ghz = cpu.get('base_clock_ghz') name = (cpu.get('name') or '').strip() parts = [] if name: parts.append(name) if ghz: try: parts.append(f"({float(ghz):.1f}GHz)") except Exception: pass if cores: parts.append(f"@ {int(cores)} Cores") if parts: summary['processor'] = ' '.join(parts) except Exception: pass # Total RAM (bytes) for quick UI metrics try: total = 0 mem_list = collect_memory() for m in (mem_list or []): try: total += int(m.get('capacity') or 0) except Exception: pass if not total and psutil: try: total = int(psutil.virtual_memory().total) except Exception: pass if total: summary['total_ram'] = total except Exception: pass details = { 'summary': summary, 'software': collect_software(), 'memory': collect_memory(), 'storage': collect_storage(), 'network': network, } return details class Role: def __init__(self, ctx): self.ctx = ctx self._ext_ip = None self._ext_ip_ts = 0 self._refresh_ts = 0 self._last_details = None # OS is collected dynamically; do not persist in config # Start periodic reporter try: self.task = self.ctx.loop.create_task(self._report_loop()) except Exception: self.task = None def stop_all(self): try: if self.task: self.task.cancel() except Exception: pass async def _report_loop(self): interval_sec = 300 # post heartbeat/details every 5 minutes while True: try: # Determine audit refresh interval (minutes), default 30 try: refresh_min = int(self.ctx.config.data.get('audit_interval_minutes', 30)) except Exception: refresh_min = 30 refresh_sec = max(300, refresh_min * 60) now = time.time() need_refresh = (not self._last_details) or ((now - self._refresh_ts) > refresh_sec) if need_refresh: # Always collect via built-in Python collectors details = _build_details_fallback() # Best-effort fill of missing/renamed fields so UI is happy try: details = self._normalize_details(details) except Exception: pass if details: self._last_details = details self._refresh_ts = now # Always post the latest available details (possibly cached) details_to_send = self._last_details or {'summary': collect_summary(self.ctx.config)} get_url = (self.ctx.hooks.get('get_server_url') if isinstance(self.ctx.hooks, dict) else None) or (lambda: 'http://localhost:5000') url = (get_url() or '').rstrip('/') + '/api/agent/details' payload = { 'agent_id': self.ctx.agent_id, 'hostname': details_to_send.get('summary', {}).get('hostname', socket.gethostname()), 'details': details_to_send, } if aiohttp is not None: async with aiohttp.ClientSession() as session: await session.post(url, json=payload, timeout=10) except Exception: pass await asyncio.sleep(interval_sec) def _normalize_details(self, details: dict) -> dict: if not isinstance(details, dict): return {} details.setdefault('summary', {}) summary = details['summary'] # Map legacy 'os' to 'operating_system' try: if not summary.get('operating_system') and summary.get('os'): summary['operating_system'] = summary.get('os') except Exception: pass # Device type fallback try: dt = (summary.get('device_type') or '').strip() if not dt: dt = detect_device_type() or '' if dt: summary['device_type'] = dt except Exception: pass # Internal IP fallback from network list try: if not summary.get('internal_ip'): net = details.get('network') or [] ipv4s = [] for a in net: for ip in (a.get('ips') or []): try: if ip and isinstance(ip, str) and ip.count('.') == 3 and not ip.startswith('169.254.') and ip != '127.0.0.1': ipv4s.append(ip) except Exception: pass summary['internal_ip'] = ipv4s[0] if ipv4s else '' except Exception: pass # External IP best-effort (cache ~15 min) if still missing try: now = time.time() ext = (summary.get('external_ip') or '').strip() if not ext and (now - self._ext_ip_ts > 900): # lightweight fetch without blocking forever import urllib.request # lazy import for url in ( 'https://api.ipify.org', 'https://checkip.amazonaws.com', ): try: with urllib.request.urlopen(url, timeout=3) as resp: txt = (resp.read() or b'').decode('utf-8', errors='ignore').strip() if txt and '\n' in txt: txt = txt.split('\n', 1)[0].strip() # api.ipify.org returns plain IP by default; if JSON was served, handle a small case if '{' in txt: try: obj = json.loads(txt) txt = (obj.get('ip') or '').strip() except Exception: pass if txt: self._ext_ip = txt self._ext_ip_ts = now break except Exception: continue if not ext and self._ext_ip: summary['external_ip'] = self._ext_ip except Exception: pass # Last reboot (UTC string) if missing/unknown try: val = (summary.get('last_reboot') or '').strip() if not val or val.lower() == 'unknown': if psutil and hasattr(psutil, 'boot_time'): from datetime import datetime, timezone summary['last_reboot'] = datetime.fromtimestamp(psutil.boot_time(), timezone.utc).strftime('%Y-%m-%d %H:%M:%S') elif IS_WINDOWS: ps = ( "$b=(Get-CimInstance Win32_OperatingSystem).LastBootUpTime; " "(Get-Date $b).ToUniversalTime().ToString('yyyy-MM-dd HH:mm:ss')" ) out = subprocess.run(["powershell", "-NoProfile", "-Command", ps], capture_output=True, text=True, timeout=10) s = (out.stdout or '').strip() if s: summary['last_reboot'] = s.splitlines()[0].strip() except Exception: pass # Last user fix-up: compute if missing/unknown or contains machine account entries try: lu = (summary.get('last_user') or '').strip() def _contains_machine_accounts(s: str) -> bool: try: for part in s.split(','): if part.strip().endswith('$'): return True except Exception: pass return False if (not lu) or (lu.lower() == 'unknown') or _contains_machine_accounts(lu): lu2 = _collect_last_user_registry().strip() summary['last_user'] = lu2 if lu2 else 'No Users Logged In' except Exception: pass details['summary'] = summary return details