diff --git a/Data/Agent/security.py b/Data/Agent/security.py index acf1d83..3597727 100644 --- a/Data/Agent/security.py +++ b/Data/Agent/security.py @@ -130,12 +130,109 @@ def _resolve_agent_identity_dir(settings_dir: str, scope: str) -> str: project_root = settings_path.parent base = project_root / "Certificates" / "Agent" - target = base / "Identity" - if scope_name in {"SYSTEM", "CURRENTUSER"}: - target = target / scope_name - elif scope_name: - target = target / scope_name + meta_fp: Optional[str] = None + try: + meta_path = Path(settings_dir).resolve() / "access.meta.json" + if meta_path.is_file(): + meta_data = json.loads(meta_path.read_text(encoding="utf-8")) + value = (meta_data.get("ssl_key_fingerprint") or "").strip().lower() + if value: + meta_fp = value + except Exception: + meta_fp = None + shared_target = base / "Identity" + try: + shared_target.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + if scope_name in {"SYSTEM", "CURRENTUSER"}: + private_name = "agent_identity_private.ed25519" + public_name = "agent_identity_public.ed25519" + shared_private = shared_target / private_name + shared_public = shared_target / public_name + + legacy_dirs: List[Path] = [] + for candidate_scope in ("SYSTEM", "CURRENTUSER"): + legacy_path = shared_target / candidate_scope + if legacy_path.is_dir(): + legacy_dirs.append(legacy_path) + + shared_fp = _fingerprint_from_public_file(shared_public) + if meta_fp and shared_fp and shared_fp == meta_fp: + return str(shared_target) + + def _adopt_identity(source_dir: Path) -> bool: + if not source_dir.is_dir(): + return False + src_private = source_dir / private_name + src_public = source_dir / public_name + if not src_private.exists() or not src_public.exists(): + return False + moved = False + try: + if shared_private.exists(): + shared_private.unlink() + except Exception: + pass + try: + if shared_public.exists(): + shared_public.unlink() + except Exception: + pass + try: + src_private.replace(shared_private) + src_public.replace(shared_public) + moved = True + except Exception: + try: + shared_private.write_bytes(src_private.read_bytes()) + shared_public.write_bytes(src_public.read_bytes()) + moved = True + except Exception: + moved = False + if moved: + try: + if src_private.exists(): + src_private.unlink() + except Exception: + pass + try: + if src_public.exists(): + src_public.unlink() + except Exception: + pass + return moved + + if meta_fp: + for candidate in legacy_dirs: + candidate_fp = _fingerprint_from_public_file(candidate / public_name) + if candidate_fp and candidate_fp == meta_fp: + if _adopt_identity(candidate): + return str(shared_target) + + try: + if ( + (not shared_private.exists() or not shared_public.exists()) + and legacy_dirs + ): + for candidate in legacy_dirs: + if _adopt_identity(candidate): + break + except Exception: + pass + + for candidate in legacy_dirs: + try: + if candidate.is_dir() and not any(candidate.iterdir()): + candidate.rmdir() + except Exception: + pass + + return str(shared_target) + + target = shared_target / scope_name if scope_name else shared_target try: target.mkdir(parents=True, exist_ok=True) except Exception: @@ -301,6 +398,19 @@ def _fingerprint_der(public_der: bytes) -> str: return digest.lower() +def _fingerprint_from_public_file(path: Path) -> Optional[str]: + try: + if not path or not path.is_file(): + return None + data = path.read_text(encoding="utf-8").strip() + if not data: + return None + der = base64.b64decode(data) + return _fingerprint_der(der) + except Exception: + return None + + @dataclass class AgentIdentity: private_key: ed25519.Ed25519PrivateKey