Security Updates

This commit is contained in:
2025-10-19 18:28:20 -06:00
parent 458ad7af4a
commit 4f4929e5cc

View File

@@ -130,12 +130,109 @@ def _resolve_agent_identity_dir(settings_dir: str, scope: str) -> str:
project_root = settings_path.parent
base = project_root / "Certificates" / "Agent"
target = base / "Identity"
if scope_name in {"SYSTEM", "CURRENTUSER"}:
target = target / scope_name
elif scope_name:
target = target / scope_name
meta_fp: Optional[str] = None
try:
meta_path = Path(settings_dir).resolve() / "access.meta.json"
if meta_path.is_file():
meta_data = json.loads(meta_path.read_text(encoding="utf-8"))
value = (meta_data.get("ssl_key_fingerprint") or "").strip().lower()
if value:
meta_fp = value
except Exception:
meta_fp = None
shared_target = base / "Identity"
try:
shared_target.mkdir(parents=True, exist_ok=True)
except Exception:
pass
if scope_name in {"SYSTEM", "CURRENTUSER"}:
private_name = "agent_identity_private.ed25519"
public_name = "agent_identity_public.ed25519"
shared_private = shared_target / private_name
shared_public = shared_target / public_name
legacy_dirs: List[Path] = []
for candidate_scope in ("SYSTEM", "CURRENTUSER"):
legacy_path = shared_target / candidate_scope
if legacy_path.is_dir():
legacy_dirs.append(legacy_path)
shared_fp = _fingerprint_from_public_file(shared_public)
if meta_fp and shared_fp and shared_fp == meta_fp:
return str(shared_target)
def _adopt_identity(source_dir: Path) -> bool:
if not source_dir.is_dir():
return False
src_private = source_dir / private_name
src_public = source_dir / public_name
if not src_private.exists() or not src_public.exists():
return False
moved = False
try:
if shared_private.exists():
shared_private.unlink()
except Exception:
pass
try:
if shared_public.exists():
shared_public.unlink()
except Exception:
pass
try:
src_private.replace(shared_private)
src_public.replace(shared_public)
moved = True
except Exception:
try:
shared_private.write_bytes(src_private.read_bytes())
shared_public.write_bytes(src_public.read_bytes())
moved = True
except Exception:
moved = False
if moved:
try:
if src_private.exists():
src_private.unlink()
except Exception:
pass
try:
if src_public.exists():
src_public.unlink()
except Exception:
pass
return moved
if meta_fp:
for candidate in legacy_dirs:
candidate_fp = _fingerprint_from_public_file(candidate / public_name)
if candidate_fp and candidate_fp == meta_fp:
if _adopt_identity(candidate):
return str(shared_target)
try:
if (
(not shared_private.exists() or not shared_public.exists())
and legacy_dirs
):
for candidate in legacy_dirs:
if _adopt_identity(candidate):
break
except Exception:
pass
for candidate in legacy_dirs:
try:
if candidate.is_dir() and not any(candidate.iterdir()):
candidate.rmdir()
except Exception:
pass
return str(shared_target)
target = shared_target / scope_name if scope_name else shared_target
try:
target.mkdir(parents=True, exist_ok=True)
except Exception:
@@ -301,6 +398,19 @@ def _fingerprint_der(public_der: bytes) -> str:
return digest.lower()
def _fingerprint_from_public_file(path: Path) -> Optional[str]:
try:
if not path or not path.is_file():
return None
data = path.read_text(encoding="utf-8").strip()
if not data:
return None
der = base64.b64decode(data)
return _fingerprint_der(der)
except Exception:
return None
@dataclass
class AgentIdentity:
private_key: ed25519.Ed25519PrivateKey