Continued WireGuard troubleshooting

This commit is contained in:
2026-01-12 20:20:48 -07:00
parent 28fd0401e3
commit 4aff4ba8c0
3 changed files with 936 additions and 161 deletions

View File

@@ -22,9 +22,14 @@ import os
import subprocess
import threading
import time
import re
from pathlib import Path
from typing import Any, Dict, Optional
try:
import winreg # type: ignore
except Exception: # pragma: no cover - non-Windows guard
winreg = None
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import x25519
@@ -41,6 +46,9 @@ except Exception: # pragma: no cover - fallback for runtime path issues
ROLE_NAME = "WireGuardTunnel"
ROLE_CONTEXTS = ["system"]
TUNNEL_NAME = "Borealis"
TUNNEL_DISPLAY_NAME = "Borealis"
TUNNEL_IDLE_ADDRESS = "169.254.255.254/32"
def _log_path() -> Path:
@@ -130,14 +138,21 @@ class WireGuardClient:
self.cert_root = base / "Borealis" / "Certificates" / "VPN_Client"
self.temp_root = base / "Borealis" / "Temp"
self.temp_root.mkdir(parents=True, exist_ok=True)
self.conf_path = self.temp_root / "borealis-wg-client.conf"
self.service_name = "borealis-wg-client"
self.service_name = TUNNEL_NAME
self.display_name = TUNNEL_DISPLAY_NAME
self.conf_path = self._wireguard_config_path()
self.session: Optional[SessionConfig] = None
self.idle_deadline: Optional[float] = None
self._idle_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._session_lock = threading.Lock()
self._client_keys = _generate_client_keys(self.cert_root)
self._wg_exe = self._resolve_wireguard_exe()
self._last_install_already_present = False
try:
self._ensure_idle_service()
except Exception:
pass
def _resolve_wireguard_exe(self) -> str:
candidates = [
@@ -149,6 +164,148 @@ class WireGuardClient:
return candidate
return "wireguard.exe"
def _service_id(self) -> str:
return f"WireGuardTunnel${self.service_name}"
def _service_reg_path(self) -> str:
return f"SYSTEM\\CurrentControlSet\\Services\\{self._service_id()}"
def _service_reg_exists(self) -> bool:
if winreg is None:
return False
try:
winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, self._service_reg_path())
return True
except FileNotFoundError:
return False
except PermissionError:
_write_log("WireGuard service registry check denied; treating as present.")
return True
except Exception as exc:
_write_log(f"WireGuard service registry check failed: {exc}")
return False
def _service_image_path(self) -> Optional[str]:
if winreg is None:
return None
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, self._service_reg_path())
value, _ = winreg.QueryValueEx(key, "ImagePath")
return str(value) if value else None
except Exception:
return None
def _service_config_path(self) -> Optional[Path]:
image_path = self._service_image_path()
if not image_path:
return None
match = re.search(r'(?i)/tunnelservice\s+"([^"]+)"', image_path)
if match:
return Path(match.group(1))
match = re.search(r"(?i)/tunnelservice\s+(\S+)", image_path)
if match:
return Path(match.group(1))
return None
def _wireguard_config_path(self) -> Path:
settings_dir = self.temp_root.parent / "Settings" / "WireGuard"
candidates = [
settings_dir,
Path(os.environ.get("ProgramFiles", "C:\\Program Files")) / "WireGuard" / "Data" / "Configurations",
Path(os.environ.get("ProgramData", "C:\\ProgramData")) / "Borealis" / "WireGuard" / "Configurations",
self.temp_root,
]
for config_dir in candidates:
candidate = config_dir / f"{self.service_name}.conf"
if candidate.is_file():
return candidate
for config_dir in candidates:
try:
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir / f"{self.service_name}.conf"
except Exception:
continue
return self.temp_root / f"{self.service_name}.conf"
def _write_config(self, text: str) -> bool:
return self._write_config_to(self.conf_path, text)
def _write_config_to(self, path: Path, text: str) -> bool:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="ascii")
return True
except Exception as exc:
_write_log(f"Failed to write WireGuard config at {path}: {exc}")
return False
def _render_idle_config(self) -> str:
private_key = self._client_keys["private"]
return "\n".join(
[
"[Interface]",
f"PrivateKey = {private_key}",
f"Address = {TUNNEL_IDLE_ADDRESS}",
"ListenPort = 0",
]
)
def _service_exists(self) -> bool:
code, _, _ = self._run(["sc.exe", "query", self._service_id()])
if code == 0:
return True
return self._service_reg_exists()
def _install_service(self) -> bool:
code, out, err = self._run([self._wg_exe, "/installtunnelservice", str(self.conf_path)])
self._last_install_already_present = False
if code != 0:
if "already installed and running" in err.lower():
self._last_install_already_present = True
_write_log("WireGuard tunnel service already installed; skipping install.")
return True
if "access is denied" in err.lower():
_write_log("Failed to install WireGuard tunnel service: access denied; ensure agent runs elevated.")
return False
_write_log(f"Failed to install WireGuard tunnel service: code={code} err={err}")
return False
return True
def _restart_service(self) -> bool:
service_id = self._service_id()
stop_code, _, stop_err = self._run(["sc.exe", "stop", service_id])
if stop_code != 0 and stop_err:
_write_log(f"WireGuard stop service returned code={stop_code} err={stop_err}")
time.sleep(1)
start_code, _, start_err = self._run(["sc.exe", "start", service_id])
if start_code != 0 and start_err:
_write_log(f"WireGuard start service returned code={start_code} err={start_err}")
return start_code == 0
def _ensure_adapter_name(self) -> None:
if self.service_name == self.display_name:
return
args = [
"netsh.exe",
"interface",
"set",
"interface",
f'name="{self.service_name}"',
f'newname="{self.display_name}"',
]
self._run(args)
def _ensure_idle_service(self) -> None:
if self._service_exists():
return
if not Path(self._wg_exe).is_file():
return
idle_config = self._render_idle_config()
if not self._write_config(idle_config):
return
if self._install_service():
self._ensure_adapter_name()
def _validate_token(self, token: Dict[str, Any], *, signing_client: Optional[Any] = None) -> None:
payload = dict(token or {})
signature = payload.pop("signature", None)
@@ -232,43 +389,72 @@ class WireGuardClient:
self._idle_thread = t
def start_session(self, session: SessionConfig, *, signing_client: Optional[Any] = None) -> None:
if self.session:
_write_log("Rejecting start_session: existing session already active.")
return
with self._session_lock:
if self.session:
_write_log("Rejecting start_session: existing session already active.")
return
try:
self._validate_token(session.token, signing_client=signing_client)
except Exception as exc:
_write_log(f"Refusing to start WireGuard session: {exc}")
return
try:
self._validate_token(session.token, signing_client=signing_client)
except Exception as exc:
_write_log(f"Refusing to start WireGuard session: {exc}")
return
rendered = self._render_config(session)
self.conf_path.write_text(rendered, encoding="utf-8")
_write_log(f"Rendered WireGuard client config to {self.conf_path}")
rendered = self._render_config(session)
if not self._write_config(rendered):
_write_log("Failed to write WireGuard client config.")
return
_write_log(f"Rendered WireGuard client config to {self.conf_path}")
# Pre-stop any orphaned tunnel service using the same name
self.stop_session(reason="preflight", ignore_missing=True)
service_config_path = self._service_config_path()
if service_config_path and service_config_path != self.conf_path:
if self._write_config_to(service_config_path, rendered):
_write_log(f"Rendered WireGuard client config to service path {service_config_path}")
code, out, err = self._run([self._wg_exe, "/installtunnelservice", str(self.conf_path)])
if code != 0:
_write_log(f"Failed to install WireGuard client tunnel: code={code} err={err}")
return
if not self._service_exists():
if not self._install_service():
return
self.session = session
self.idle_deadline = time.time() + max(60, session.idle_seconds)
_write_log("WireGuard client session started; idle timer armed.")
self._start_idle_monitor()
service_present = self._service_exists()
if not service_present and self._last_install_already_present:
_write_log("WireGuard tunnel service presence inferred from install response.")
service_present = True
if not service_present:
_write_log("WireGuard tunnel service still missing after install attempt.")
return
self._restart_service()
self._ensure_adapter_name()
self.session = session
self.idle_deadline = time.time() + max(60, session.idle_seconds)
_write_log("WireGuard client session started; idle timer armed.")
self._start_idle_monitor()
def stop_session(self, reason: str = "stop", ignore_missing: bool = False) -> None:
code, out, err = self._run([self._wg_exe, "/uninstalltunnelservice", self.service_name])
if code != 0:
if not ignore_missing:
_write_log(f"Failed to uninstall WireGuard client tunnel: code={code} err={err}")
else:
_write_log(f"WireGuard client session stopped (reason={reason}).")
self.session = None
self.idle_deadline = None
self._stop_event.set()
with self._session_lock:
if not self._service_exists():
if not ignore_missing:
_write_log("WireGuard tunnel service not found when stopping session.")
self.session = None
self.idle_deadline = None
self._stop_event.set()
return
idle_config = self._render_idle_config()
wrote_idle = self._write_config(idle_config)
service_config_path = self._service_config_path()
if service_config_path and service_config_path != self.conf_path:
wrote_idle = self._write_config_to(service_config_path, idle_config) or wrote_idle
if wrote_idle:
self._restart_service()
self._ensure_adapter_name()
_write_log(f"WireGuard client session stopped (reason={reason}).")
elif not ignore_missing:
_write_log("Failed to write idle WireGuard config.")
self.session = None
self.idle_deadline = None
self._stop_event.set()
def bump_activity(self) -> None:
if self.session and self.idle_deadline: