Use Python disk_usage for Windows storage metrics

This commit is contained in:
2025-08-13 03:13:32 -06:00
parent 00792c5b10
commit 396b15aa4c

View File

@@ -17,6 +17,8 @@ import time # Heartbeat timestamps
import subprocess import subprocess
import getpass import getpass
import datetime import datetime
import shutil
import string
import requests import requests
try: try:
@@ -469,61 +471,79 @@ def collect_storage():
"used": usage.used, "used": usage.used,
}) })
elif plat == "windows": elif plat == "windows":
try: found = False
out = subprocess.run( for letter in string.ascii_uppercase:
["wmic", "logicaldisk", "get", "DeviceID,Size,FreeSpace"], drive = f"{letter}:\\"
capture_output=True, if os.path.exists(drive):
text=True, try:
timeout=60, usage = shutil.disk_usage(drive)
) except Exception:
lines = [l for l in out.stdout.splitlines() if l.strip()][1:] continue
for line in lines:
parts = line.split()
if len(parts) >= 3:
drive, free, size = parts[0], parts[1], parts[2]
try:
total = float(size)
free_bytes = float(free)
used = total - free_bytes
usage = (used / total * 100) if total else 0
disks.append({
"drive": drive,
"disk_type": "Fixed Disk",
"usage": usage,
"total": total,
"free": free_bytes,
"used": used,
})
except Exception:
pass
except FileNotFoundError:
ps_cmd = (
"Get-PSDrive -PSProvider FileSystem | "
"Select-Object Name,Free,Used,Capacity,Root | ConvertTo-Json"
)
out = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True,
text=True,
timeout=60,
)
data = json.loads(out.stdout or "[]")
if isinstance(data, dict):
data = [data]
for d in data:
total = d.get("Capacity") or 0
used = d.get("Used") or 0
free_bytes = d.get("Free") or max(total - used, 0)
usage = (used / total * 100) if total else 0
drive = d.get("Root") or f"{d.get('Name','')}:"
disks.append({ disks.append({
"drive": drive, "drive": drive,
"disk_type": "Fixed Disk", "disk_type": "Fixed Disk",
"usage": usage, "usage": (usage.used / usage.total * 100) if usage.total else 0,
"total": total, "total": usage.total,
"free": free_bytes, "free": usage.free,
"used": used, "used": usage.used,
}) })
found = True
if not found:
try:
out = subprocess.run(
["wmic", "logicaldisk", "get", "DeviceID,Size,FreeSpace"],
capture_output=True,
text=True,
timeout=60,
)
lines = [l for l in out.stdout.splitlines() if l.strip()][1:]
for line in lines:
parts = line.split()
if len(parts) >= 3:
drive, free, size = parts[0], parts[1], parts[2]
try:
total = float(size)
free_bytes = float(free)
used = total - free_bytes
usage = (used / total * 100) if total else 0
disks.append({
"drive": drive,
"disk_type": "Fixed Disk",
"usage": usage,
"total": total,
"free": free_bytes,
"used": used,
})
except Exception:
pass
except FileNotFoundError:
ps_cmd = (
"Get-PSDrive -PSProvider FileSystem | "
"Select-Object Name,Free,Used,Capacity,Root | ConvertTo-Json"
)
out = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True,
text=True,
timeout=60,
)
data = json.loads(out.stdout or "[]")
if isinstance(data, dict):
data = [data]
for d in data:
total = d.get("Capacity") or 0
used = d.get("Used") or 0
free_bytes = d.get("Free") or max(total - used, 0)
usage = (used / total * 100) if total else 0
drive = d.get("Root") or f"{d.get('Name','')}:"
disks.append({
"drive": drive,
"disk_type": "Fixed Disk",
"usage": usage,
"total": total,
"free": free_bytes,
"used": used,
})
else: else:
out = subprocess.run( out = subprocess.run(
["df", "-kP"], capture_output=True, text=True, timeout=60 ["df", "-kP"], capture_output=True, text=True, timeout=60