Files
Borealis-Github-Replica/Data/Agent/agent_info.py

846 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import time
import socket
import platform
import subprocess
import getpass
import datetime
import shutil
import string
import requests
try:
import psutil # type: ignore
except Exception:
psutil = None # graceful degradation if unavailable
import aiohttp
import asyncio
# ---------------- Helpers for hidden subprocess on Windows ----------------
IS_WINDOWS = os.name == 'nt'
CREATE_NO_WINDOW = 0x08000000 if IS_WINDOWS else 0
def _run_hidden(cmd_list, timeout=None):
"""Run a subprocess hidden on Windows (no visible console window)."""
kwargs = {"capture_output": True, "text": True}
if timeout is not None:
kwargs["timeout"] = timeout
if IS_WINDOWS:
kwargs["creationflags"] = CREATE_NO_WINDOW
return subprocess.run(cmd_list, **kwargs)
def _run_powershell_hidden(ps_cmd: str, timeout: int = 60):
"""Run a powershell -NoProfile -Command string fully hidden on Windows."""
ps = os.path.expandvars(r"%SystemRoot%\\System32\\WindowsPowerShell\\v1.0\\powershell.exe")
if not os.path.isfile(ps):
ps = "powershell.exe"
return _run_hidden([ps, "-NoProfile", "-Command", ps_cmd], timeout=timeout)
def detect_agent_os():
"""
Detects the full, user-friendly operating system name and version.
Examples:
- "Windows 11"
- "Windows 10"
- "Fedora Workstation 42"
- "Ubuntu 22.04 LTS"
- "macOS Sonoma"
Falls back to a generic name if detection fails.
"""
try:
plat = platform.system().lower()
if plat.startswith('win'):
try:
import winreg # Only available on Windows
reg_path = r"SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion"
access = winreg.KEY_READ
try:
access |= winreg.KEY_WOW64_64KEY
except Exception:
pass
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0, access)
except OSError:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path, 0, winreg.KEY_READ)
def _get(name, default=None):
try:
return winreg.QueryValueEx(key, name)[0]
except Exception:
return default
product_name = _get("ProductName", "") # e.g., "Windows 11 Pro"
edition_id = _get("EditionID", "") # e.g., "Professional"
display_version = _get("DisplayVersion", "") # e.g., "24H2" / "22H2"
release_id = _get("ReleaseId", "") # e.g., "2004" on older Windows 10
build_number = _get("CurrentBuildNumber", "") or _get("CurrentBuild", "")
ubr = _get("UBR", None) # Update Build Revision (int)
try:
build_int = int(str(build_number).split(".")[0]) if build_number else 0
except Exception:
build_int = 0
if build_int >= 22000:
major_label = "11"
elif build_int >= 10240:
major_label = "10"
else:
major_label = platform.release()
edition = ""
pn = product_name or ""
if pn.lower().startswith("windows "):
tokens = pn.split()
if len(tokens) >= 3:
edition = " ".join(tokens[2:])
if not edition and edition_id:
eid_map = {
"Professional": "Pro",
"ProfessionalN": "Pro N",
"ProfessionalEducation": "Pro Education",
"ProfessionalWorkstation": "Pro for Workstations",
"Enterprise": "Enterprise",
"EnterpriseN": "Enterprise N",
"EnterpriseS": "Enterprise LTSC",
"Education": "Education",
"EducationN": "Education N",
"Core": "Home",
"CoreN": "Home N",
"CoreSingleLanguage": "Home Single Language",
"IoTEnterprise": "IoT Enterprise",
}
edition = eid_map.get(edition_id, edition_id)
os_name = f"Windows {major_label}"
version_label = display_version or release_id or ""
if isinstance(ubr, int):
build_str = f"{build_number}.{ubr}" if build_number else str(ubr)
else:
try:
build_str = f"{build_number}.{int(ubr)}" if build_number and ubr is not None else build_number
except Exception:
build_str = build_number
parts = ["Microsoft", os_name]
if edition:
parts.append(edition)
if version_label:
parts.append(version_label)
if build_str:
parts.append(f"Build {build_str}")
return " ".join(p for p in parts if p).strip()
except Exception:
return f"Windows {platform.release()}"
elif plat.startswith('linux'):
try:
import distro # External package, better for Linux OS detection
name = distro.name(pretty=True) # e.g., "Fedora Workstation 42"
if name:
return name
else:
return f"{platform.system()} {platform.release()}"
except ImportError:
return f"{platform.system()} {platform.release()}"
elif plat.startswith('darwin'):
version = platform.mac_ver()[0]
macos_names = {
"14": "Sonoma",
"13": "Ventura",
"12": "Monterey",
"11": "Big Sur",
"10.15": "Catalina"
}
pretty_name = macos_names.get(".".join(version.split(".")[:2]), "")
return f"macOS {pretty_name or version}"
else:
return f"Unknown OS ({platform.system()} {platform.release()})"
except Exception as e:
print(f"[WARN] OS detection failed: {e}")
return "Unknown"
def _detect_virtual_machine() -> bool:
"""Best-effort detection of whether this system is a virtual machine.
Uses platform-specific signals but avoids heavy dependencies.
"""
try:
plat = platform.system().lower()
if plat == "linux":
# Prefer systemd-detect-virt if available
try:
out = subprocess.run([
"systemd-detect-virt", "--vm"
], capture_output=True, text=True, timeout=3)
if out.returncode == 0 and (out.stdout or "").strip():
return True
except Exception:
pass
# Fallback to DMI sysfs strings
for p in (
"/sys/class/dmi/id/product_name",
"/sys/class/dmi/id/sys_vendor",
"/sys/class/dmi/id/board_vendor",
):
try:
with open(p, "r", encoding="utf-8", errors="ignore") as fh:
s = (fh.read() or "").lower()
if any(k in s for k in (
"kvm", "vmware", "virtualbox", "qemu", "xen", "hyper-v", "microsoft corporation")):
return True
except Exception:
pass
elif plat == "windows":
# Inspect model/manufacturer via CIM
try:
ps_cmd = (
"$cs = Get-CimInstance Win32_ComputerSystem; "
"$model = [string]$cs.Model; $manu = [string]$cs.Manufacturer; "
"Write-Output ($model + '|' + $manu)"
)
out = _run_powershell_hidden(ps_cmd, timeout=6)
s = (out.stdout or "").strip().lower()
if any(k in s for k in ("virtual", "vmware", "virtualbox", "kvm", "qemu", "xen", "hyper-v")):
return True
except Exception:
pass
# Fallback: registry BIOS strings often include virtualization hints
try:
import winreg # type: ignore
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"HARDWARE\DESCRIPTION\System") as k:
for name in ("SystemBiosVersion", "VideoBiosVersion"):
try:
val, _ = winreg.QueryValueEx(k, name)
s = str(val).lower()
if any(x in s for x in ("virtual", "vmware", "virtualbox", "qemu", "xen", "hyper-v")):
return True
except Exception:
continue
except Exception:
pass
elif plat == "darwin":
# macOS guest detection is tricky; limited heuristic
try:
out = subprocess.run([
"sysctl", "-n", "machdep.cpu.features"
], capture_output=True, text=True, timeout=3)
s = (out.stdout or "").lower()
if "vmm" in s:
return True
except Exception:
pass
except Exception:
pass
return False
def _detect_device_type_non_vm() -> str:
"""Classify non-VM device as Laptop, Desktop, or Server.
This is intentionally conservative; if unsure, returns 'Desktop'.
"""
try:
plat = platform.system().lower()
if plat == "windows":
# Prefer PCSystemTypeEx, then chassis types, then battery presence
try:
ps_cmd = (
"$cs = Get-CimInstance Win32_ComputerSystem; "
"$typeEx = [int]($cs.PCSystemTypeEx); "
"$type = [int]($cs.PCSystemType); "
"$ch = (Get-CimInstance Win32_SystemEnclosure).ChassisTypes; "
"$hasBatt = @(Get-CimInstance Win32_Battery).Count -gt 0; "
"Write-Output ($typeEx.ToString() + '|' + $type.ToString() + '|' + "
"([string]::Join(',', $ch)) + '|' + $hasBatt)"
)
out = _run_powershell_hidden(ps_cmd, timeout=6)
resp = (out.stdout or "").strip()
parts = resp.split("|")
type_ex = int(parts[0]) if len(parts) > 0 and parts[0].isdigit() else None
type_pc = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else None
chassis = []
if len(parts) > 2 and parts[2].strip():
for t in parts[2].split(','):
t = t.strip()
if t.isdigit():
chassis.append(int(t))
has_batt = False
if len(parts) > 3:
has_batt = parts[3].strip().lower() in ("true", "1")
# PCSystemTypeEx mapping per MS docs
if type_ex in (4, 5, 7, 8):
return "Server"
if type_ex == 2:
return "Laptop"
if type_ex in (1, 3):
return "Desktop"
# Fallback to PCSystemType
if type_pc in (4, 5):
return "Server"
if type_pc == 2:
return "Laptop"
if type_pc in (1, 3):
return "Desktop"
# ChassisType mapping (DMTF)
laptop_types = {8, 9, 10, 14, 30, 31}
server_types = {17, 23}
desktop_types = {3, 4, 5, 6, 7, 15, 16, 24, 35}
if any(ct in laptop_types for ct in chassis):
return "Laptop"
if any(ct in server_types for ct in chassis):
return "Server"
if any(ct in desktop_types for ct in chassis):
return "Desktop"
if has_batt:
return "Laptop"
except Exception:
pass
return "Desktop"
if plat == "linux":
# hostnamectl exposes chassis when available
try:
out = subprocess.run(["hostnamectl"], capture_output=True, text=True, timeout=3)
for line in (out.stdout or "").splitlines():
if ":" in line:
k, v = line.split(":", 1)
if k.strip().lower() == "chassis":
val = v.strip().lower()
if "laptop" in val:
return "Laptop"
if "desktop" in val:
return "Desktop"
if "server" in val:
return "Server"
break
except Exception:
pass
# DMI chassis type numeric
try:
with open("/sys/class/dmi/id/chassis_type", "r", encoding="utf-8", errors="ignore") as fh:
s = (fh.read() or "").strip()
ct = int(s)
laptop_types = {8, 9, 10, 14, 30, 31}
server_types = {17, 23}
desktop_types = {3, 4, 5, 6, 7, 15, 16, 24, 35}
if ct in laptop_types:
return "Laptop"
if ct in server_types:
return "Server"
if ct in desktop_types:
return "Desktop"
except Exception:
pass
# Battery presence heuristic
try:
if os.path.isdir("/sys/class/power_supply"):
for name in os.listdir("/sys/class/power_supply"):
if name.lower().startswith("bat"):
return "Laptop"
except Exception:
pass
return "Desktop"
if plat == "darwin":
try:
out = subprocess.run(["sysctl", "-n", "hw.model"], capture_output=True, text=True, timeout=3)
model = (out.stdout or "").strip()
if model:
if model.lower().startswith("macbook"):
return "Laptop"
# iMac, Macmini, MacPro -> treat as Desktop
return "Desktop"
except Exception:
pass
return "Desktop"
except Exception:
pass
return "Desktop"
def detect_device_type() -> str:
"""Return one of: 'Laptop', 'Desktop', 'Server', 'Virtual Machine'."""
try:
if _detect_virtual_machine():
return "Virtual Machine"
return _detect_device_type_non_vm()
except Exception:
return "Desktop"
def _get_internal_ip():
"""Best-effort detection of primary IPv4 address without external reachability.
Order of attempts:
1) psutil.net_if_addrs() first non-loopback, non-APIPA IPv4
2) UDP connect trick to 8.8.8.8 (common technique)
3) Windows: PowerShell Get-NetIPAddress
4) Linux/macOS: `ip -o -4 addr show` or `hostname -I`
"""
# 1) psutil interfaces
try:
if psutil:
for name, addrs in (psutil.net_if_addrs() or {}).items():
for a in addrs:
if getattr(a, "family", None) == socket.AF_INET:
ip = a.address
if (
ip
and not ip.startswith("127.")
and not ip.startswith("169.254.")
and ip != "0.0.0.0"
):
return ip
except Exception:
pass
# 2) UDP connect trick
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
if ip:
return ip
except Exception:
pass
plat = platform.system().lower()
# 3) Windows PowerShell
if plat == "windows":
try:
ps_cmd = (
"Get-NetIPAddress -AddressFamily IPv4 | "
"Where-Object { $_.IPAddress -and $_.IPAddress -notmatch '^169\\.254\\.' -and $_.IPAddress -notmatch '^127\\.' } | "
"Sort-Object -Property PrefixLength | Select-Object -First 1 -ExpandProperty IPAddress"
)
out = _run_powershell_hidden(ps_cmd, timeout=20)
val = (out.stdout or "").strip()
if val:
return val
except Exception:
pass
# 4) Linux/macOS
try:
out = subprocess.run(["ip", "-o", "-4", "addr", "show"], capture_output=True, text=True, timeout=10)
for line in out.stdout.splitlines():
parts = line.split()
if len(parts) >= 4:
ip = parts[3].split("/")[0]
if ip and not ip.startswith("127.") and not ip.startswith("169.254."):
return ip
except Exception:
pass
try:
out = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=5)
val = (out.stdout or "").strip().split()
for ip in val:
if ip and not ip.startswith("127.") and not ip.startswith("169.254."):
return ip
except Exception:
pass
return "unknown"
def collect_summary(config):
try:
username = getpass.getuser()
domain = os.environ.get("USERDOMAIN") or socket.gethostname()
last_user = f"{domain}\\{username}" if username else "unknown"
except Exception:
last_user = "unknown"
try:
last_reboot = "unknown"
# First, prefer psutil if available
if psutil:
try:
last_reboot = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(psutil.boot_time()),
)
except Exception:
last_reboot = "unknown"
if last_reboot == "unknown":
plat = platform.system().lower()
if plat == "windows":
# Try WMIC, then robust PowerShell fallback regardless of WMIC presence
raw = ""
try:
out = _run_hidden(["wmic", "os", "get", "lastbootuptime"], timeout=20)
raw = "".join(out.stdout.splitlines()[1:]).strip()
except Exception:
raw = ""
# If WMIC didn't yield a value, try CIM and format directly in PowerShell
if not raw:
try:
ps_cmd = (
"(Get-CimInstance Win32_OperatingSystem).LastBootUpTime | "
"ForEach-Object { (Get-Date -Date $_ -Format 'yyyy-MM-dd HH:mm:ss') }"
)
out = _run_powershell_hidden(ps_cmd, timeout=20)
raw = (out.stdout or "").strip()
if raw:
last_reboot = raw
except Exception:
raw = ""
# Parse WMIC-style if we had it
if last_reboot == "unknown" and raw:
try:
boot = datetime.datetime.strptime(raw.split(".")[0], "%Y%m%d%H%M%S")
last_reboot = boot.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
else:
try:
out = subprocess.run(["uptime", "-s"], capture_output=True, text=True, timeout=10)
val = out.stdout.strip()
if val:
last_reboot = val
except Exception:
pass
except Exception:
last_reboot = "unknown"
created = config.data.get("created")
if not created:
created = time.strftime("%Y-%m-%d %H:%M:%S")
config.data["created"] = created
try:
config._write()
except Exception:
pass
# External IP detection with fallbacks
external_ip = "unknown"
for url in ("https://api.ipify.org", "https://api64.ipify.org", "https://ifconfig.me/ip"):
try:
external_ip = requests.get(url, timeout=5).text.strip()
if external_ip:
break
except Exception:
continue
return {
"hostname": socket.gethostname(),
"operating_system": config.data.get("agent_operating_system", detect_agent_os()),
"device_type": config.data.get("device_type", detect_device_type()),
"last_user": last_user,
"internal_ip": _get_internal_ip(),
"external_ip": external_ip,
"last_reboot": last_reboot,
"created": created,
}
def collect_software():
items = []
plat = platform.system().lower()
try:
if plat == "windows":
try:
out = _run_hidden(["wmic", "product", "get", "name,version"], timeout=60)
for line in out.stdout.splitlines():
if line.strip() and not line.lower().startswith("name"):
parts = line.strip().split(" ")
name = parts[0].strip()
version = parts[-1].strip() if len(parts) > 1 else ""
if name:
items.append({"name": name, "version": version})
except FileNotFoundError:
ps_cmd = (
"Get-ItemProperty "
"'HKLM:\\Software\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\*',"
"'HKLM:\\Software\\WOW6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\*' "
"| Where-Object { $_.DisplayName } "
"| Select-Object DisplayName,DisplayVersion "
"| ConvertTo-Json"
)
out = _run_powershell_hidden(ps_cmd, timeout=60)
data = json.loads(out.stdout or "[]")
if isinstance(data, dict):
data = [data]
for pkg in data:
name = pkg.get("DisplayName")
if name:
items.append({
"name": name,
"version": pkg.get("DisplayVersion", "")
})
elif plat == "linux":
out = subprocess.run(["dpkg-query", "-W", "-f=${Package}\t${Version}\n"], capture_output=True, text=True)
for line in out.stdout.splitlines():
if "\t" in line:
name, version = line.split("\t", 1)
items.append({"name": name, "version": version})
else:
out = subprocess.run([sys.executable, "-m", "pip", "list", "--format", "json"], capture_output=True, text=True)
data = json.loads(out.stdout or "[]")
for pkg in data:
items.append({"name": pkg.get("name"), "version": pkg.get("version")})
except Exception as e:
print(f"[WARN] collect_software failed: {e}")
return items[:100]
def collect_memory():
entries = []
plat = platform.system().lower()
try:
if plat == "windows":
try:
out = _run_hidden(["wmic", "memorychip", "get", "BankLabel,Speed,SerialNumber,Capacity"], timeout=60)
lines = [l for l in out.stdout.splitlines() if l.strip() and "BankLabel" not in l]
for line in lines:
parts = [p for p in line.split() if p]
if len(parts) >= 4:
entries.append({
"slot": parts[0],
"speed": parts[1],
"serial": parts[2],
"capacity": parts[3],
})
except FileNotFoundError:
ps_cmd = (
"Get-CimInstance Win32_PhysicalMemory | "
"Select-Object BankLabel,Speed,SerialNumber,Capacity | ConvertTo-Json"
)
out = _run_powershell_hidden(ps_cmd, timeout=60)
data = json.loads(out.stdout or "[]")
if isinstance(data, dict):
data = [data]
for stick in data:
entries.append({
"slot": stick.get("BankLabel", "unknown"),
"speed": str(stick.get("Speed", "unknown")),
"serial": stick.get("SerialNumber", "unknown"),
"capacity": stick.get("Capacity", "unknown"),
})
elif plat == "linux":
out = subprocess.run(["dmidecode", "-t", "17"], capture_output=True, text=True)
slot = speed = serial = capacity = None
for line in out.stdout.splitlines():
line = line.strip()
if line.startswith("Locator:"):
slot = line.split(":", 1)[1].strip()
elif line.startswith("Speed:"):
speed = line.split(":", 1)[1].strip()
elif line.startswith("Serial Number:"):
serial = line.split(":", 1)[1].strip()
elif line.startswith("Size:"):
capacity = line.split(":", 1)[1].strip()
elif not line and slot:
entries.append({
"slot": slot,
"speed": speed or "unknown",
"serial": serial or "unknown",
"capacity": capacity or "unknown",
})
slot = speed = serial = capacity = None
if slot:
entries.append({
"slot": slot,
"speed": speed or "unknown",
"serial": serial or "unknown",
"capacity": capacity or "unknown",
})
except Exception as e:
print(f"[WARN] collect_memory failed: {e}")
if not entries:
try:
if psutil:
vm = psutil.virtual_memory()
entries.append({
"slot": "physical",
"speed": "unknown",
"serial": "unknown",
"capacity": vm.total,
})
except Exception:
pass
return entries
def collect_storage():
disks = []
plat = platform.system().lower()
try:
if psutil:
for part in psutil.disk_partitions():
try:
usage = psutil.disk_usage(part.mountpoint)
except Exception:
continue
disks.append({
"drive": part.device,
"disk_type": "Removable" if "removable" in part.opts.lower() else "Fixed Disk",
"usage": usage.percent,
"total": usage.total,
"free": usage.free,
"used": usage.used,
})
elif plat == "windows":
found = False
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
if os.path.exists(drive):
try:
usage = shutil.disk_usage(drive)
except Exception:
continue
disks.append({
"drive": drive,
"disk_type": "Fixed Disk",
"usage": (usage.used / usage.total * 100) if usage.total else 0,
"total": usage.total,
"free": usage.free,
"used": usage.used,
})
found = True
if not found:
try:
out = _run_hidden(["wmic", "logicaldisk", "get", "DeviceID,Size,FreeSpace"], timeout=60)
lines = [l for l in out.stdout.splitlines() if l.strip()][1:]
for line in lines:
parts = line.split()
if len(parts) >= 3:
drive, free, size = parts[0], parts[1], parts[2]
try:
total = float(size)
free_bytes = float(free)
used = total - free_bytes
usage = (used / total * 100) if total else 0
disks.append({
"drive": drive,
"disk_type": "Fixed Disk",
"usage": usage,
"total": total,
"free": free_bytes,
"used": used,
})
except Exception:
pass
except Exception:
pass
else:
try:
out = subprocess.run(["df", "-hP"], capture_output=True, text=True)
for line in out.stdout.splitlines()[1:]:
parts = line.split()
if len(parts) >= 6:
try:
usage_str = parts[4].rstrip('%')
usage = float(usage_str)
except Exception:
usage = 0
total = parts[1]
free_bytes = parts[3]
used = parts[2]
disks.append({
"drive": parts[0],
"disk_type": "Mounted",
"usage": usage,
"total": total,
"free": free_bytes,
"used": used,
})
except Exception:
pass
except Exception as e:
print(f"[WARN] collect_storage failed: {e}")
return disks
def collect_network():
adapters = []
plat = platform.system().lower()
try:
if psutil:
for name, addrs in psutil.net_if_addrs().items():
ips = [a.address for a in addrs if getattr(a, "family", None) == socket.AF_INET]
mac = next((a.address for a in addrs if getattr(a, "family", None) == getattr(psutil, "AF_LINK", object)), "unknown")
adapters.append({"adapter": name, "ips": ips, "mac": mac})
elif plat == "windows":
ps_cmd = (
"Get-NetIPConfiguration | "
"Select-Object InterfaceAlias,@{Name='IPv4';Expression={$_.IPv4Address.IPAddress}},"
"@{Name='MAC';Expression={$_.NetAdapter.MacAddress}} | ConvertTo-Json"
)
out = _run_powershell_hidden(ps_cmd, timeout=60)
data = json.loads(out.stdout or "[]")
if isinstance(data, dict):
data = [data]
for a in data:
ip = a.get("IPv4")
adapters.append({
"adapter": a.get("InterfaceAlias", "unknown"),
"ips": [ip] if ip else [],
"mac": a.get("MAC", "unknown"),
})
else:
out = subprocess.run(["ip", "-o", "-4", "addr", "show"], capture_output=True, text=True, timeout=60)
for line in out.stdout.splitlines():
parts = line.split()
if len(parts) >= 4:
name = parts[1]
ip = parts[3].split("/")[0]
adapters.append({"adapter": name, "ips": [ip], "mac": "unknown"})
except Exception as e:
print(f"[WARN] collect_network failed: {e}")
return adapters
async def send_agent_details(agent_id, config):
"""Collect detailed agent data and send to server periodically."""
while True:
try:
details = {
"summary": collect_summary(config),
"software": collect_software(),
"memory": collect_memory(),
"storage": collect_storage(),
"network": collect_network(),
}
url = config.data.get("borealis_server_url", "http://localhost:5000") + "/api/agent/details"
payload = {
"agent_id": agent_id,
"hostname": details.get("summary", {}).get("hostname", socket.gethostname()),
"details": details,
}
async with aiohttp.ClientSession() as session:
await session.post(url, json=payload, timeout=10)
except Exception as e:
print(f"[WARN] Failed to send agent details: {e}")
# Report every ~2 minutes
await asyncio.sleep(120)