Milestone towards Ansible Inventory Implementation

This commit is contained in:
2025-09-27 20:24:10 -06:00
parent e4ef065271
commit 030e857f76
2 changed files with 335 additions and 7 deletions

View File

@@ -495,6 +495,151 @@ else { 'Workstation' }
return ''
def _collect_last_user_string() -> str:
if not IS_WINDOWS:
return ''
try:
ps = r"""
$ErrorActionPreference = 'SilentlyContinue'
function Get-InteractiveUsers {
$users = @()
try {
$ls = Get-CimInstance Win32_LogonSession | Where-Object { $_.LogonType -in 2,10 }
foreach ($sess in $ls) {
$accs = Get-CimAssociatedInstance -InputObject $sess -Association Win32_LoggedOnUser -ResultClassName Win32_Account
foreach ($a in $accs) {
if (-not $a -or -not $a.Name) { continue }
$name = [string]$a.Name
$domain = [string]$a.Domain
if ($name -match '\$$') { continue }
if ($domain -eq 'NT AUTHORITY' -or $domain -eq 'NT SERVICE') { continue }
if ($name -like 'DWM-*' -or $name -like 'UMFD-*') { continue }
if ($domain) { $users += ("{0}\{1}" -f $domain,$name) } else { $users += $name }
}
}
} catch {}
$users | Sort-Object -Unique
}
function Get-QuserUsers {
$list=@()
try {
$q = (quser 2>$null) -split '\r?\n'
foreach ($line in $q) {
if (-not $line) { continue }
if ($line -match '^USERNAME') { continue }
$s = ($line -replace '^>','').Trim()
if (-not $s) { continue }
$parts = $s -split '\s+'
if ($parts.Length -lt 1) { continue }
$u = $parts[0]
if (-not $u) { continue }
if ($u -match '\$$') { continue }
if ($u -like 'DWM-*' -or $u -like 'UMFD-*') { continue }
$list += $u
}
} catch {}
$list | Sort-Object -Unique
}
$u1 = Get-InteractiveUsers
$u2 = Get-QuserUsers
$combined = @()
foreach ($u in $u1) { if ($combined -notcontains $u) { $combined += $u } }
foreach ($u in $u2) { if ($combined -notcontains $u) { $combined += $u } }
if ($combined.Count -eq 0) { 'No Users Logged In' } else { $combined -join ', ' }
"""
out = subprocess.run(["powershell", "-NoProfile", "-Command", ps], capture_output=True, text=True, timeout=25)
s = (out.stdout or '').strip()
# PowerShell may emit newlines; take first non-empty line result
if s:
for line in s.splitlines():
t = line.strip()
if t:
return t
return ''
except Exception:
return ''
def _build_details_fallback() -> dict:
# Construct a details object similar to Ansible playbook output
try:
summary = collect_summary(type('C', (), {'data': {}, '_write': lambda s: None})())
except Exception:
summary = {'hostname': socket.gethostname()}
# Normalize OS field
if summary.get('os') and not summary.get('operating_system'):
summary['operating_system'] = summary.get('os')
# Last reboot in UTC string
try:
if psutil and hasattr(psutil, 'boot_time'):
from datetime import datetime, timezone
summary['last_reboot'] = datetime.fromtimestamp(psutil.boot_time(), timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
pass
# Device type
try:
dt = detect_device_type()
if dt:
summary['device_type'] = dt
except Exception:
pass
# Network
network = collect_network()
try:
# Derive internal_ip from first private IPv4
def is_private(ip: str) -> bool:
return ip.startswith('10.') or ip.startswith('192.168.') or (ip.startswith('172.') and any(ip.startswith(f'172.{n}.') for n in list(range(16,32))))
ips = []
for a in network:
for ip in (a.get('ips') or []):
if ip and is_private(ip):
ips.append(ip)
summary['internal_ip'] = ips[0] if ips else (network[0]['ips'][0] if network and network[0].get('ips') else '')
except Exception:
pass
# External IP best-effort
try:
ext = ''
for url in ('https://api.ipify.org', 'https://checkip.amazonaws.com'):
try:
import urllib.request
with urllib.request.urlopen(url, timeout=3) as resp:
txt = (resp.read() or b'').decode('utf-8', errors='ignore').strip()
if txt and '\n' in txt:
txt = txt.split('\n', 1)[0].strip()
if '{' in txt:
try:
obj = json.loads(txt); txt = (obj.get('ip') or '').strip()
except Exception:
pass
if txt:
ext = txt; break
except Exception:
continue
if ext:
summary['external_ip'] = ext
except Exception:
pass
# Last user(s)
try:
last_user = _collect_last_user_string()
if last_user:
summary['last_user'] = last_user
except Exception:
pass
details = {
'summary': summary,
'software': collect_software(),
'memory': collect_memory(),
'storage': collect_storage(),
'network': network,
}
return details
class Role:
def __init__(self, ctx):
self.ctx = ctx
@@ -537,6 +682,14 @@ class Role:
need_refresh = (not self._last_details) or ((now - self._ansible_ts) > refresh_sec)
if need_refresh:
details = _run_ansible_audit(self.ctx)
if not details:
# Fallback collector when Ansible is unavailable
details = _build_details_fallback()
# Best-effort fill of missing/renamed fields so UI is happy
try:
details = self._normalize_details(details)
except Exception:
pass
if details:
self._last_details = details
self._ansible_ts = now
@@ -555,3 +708,78 @@ class Role:
except Exception:
pass
await asyncio.sleep(interval_sec)
def _normalize_details(self, details: dict) -> dict:
if not isinstance(details, dict):
return {}
details.setdefault('summary', {})
summary = details['summary']
# Map legacy 'os' to 'operating_system'
try:
if not summary.get('operating_system') and summary.get('os'):
summary['operating_system'] = summary.get('os')
except Exception:
pass
# Device type fallback
try:
dt = (summary.get('device_type') or '').strip()
if not dt:
dt = detect_device_type() or ''
if dt:
summary['device_type'] = dt
except Exception:
pass
# Internal IP fallback from network list
try:
if not summary.get('internal_ip'):
net = details.get('network') or []
ipv4s = []
for a in net:
for ip in (a.get('ips') or []):
try:
if ip and isinstance(ip, str) and ip.count('.') == 3 and not ip.startswith('169.254.') and ip != '127.0.0.1':
ipv4s.append(ip)
except Exception:
pass
summary['internal_ip'] = ipv4s[0] if ipv4s else ''
except Exception:
pass
# External IP best-effort (cache ~15 min) if still missing
try:
now = time.time()
ext = (summary.get('external_ip') or '').strip()
if not ext and (now - self._ext_ip_ts > 900):
# lightweight fetch without blocking forever
import urllib.request # lazy import
for url in (
'https://api.ipify.org',
'https://checkip.amazonaws.com',
):
try:
with urllib.request.urlopen(url, timeout=3) as resp:
txt = (resp.read() or b'').decode('utf-8', errors='ignore').strip()
if txt and '\n' in txt:
txt = txt.split('\n', 1)[0].strip()
# api.ipify.org returns plain IP by default; if JSON was served, handle a small case
if '{' in txt:
try:
obj = json.loads(txt)
txt = (obj.get('ip') or '').strip()
except Exception:
pass
if txt:
self._ext_ip = txt
self._ext_ip_ts = now
break
except Exception:
continue
if not ext and self._ext_ip:
summary['external_ip'] = self._ext_ip
except Exception:
pass
details['summary'] = summary
return details