Share installer codes across agent contexts

This commit is contained in:
2025-10-18 03:41:29 -06:00
parent 8177cc0892
commit 64e0c05d66
3 changed files with 263 additions and 3 deletions

View File

@@ -230,6 +230,7 @@ class AgentKeyStore:
self._server_certificate_path = os.path.join(self.settings_dir, "server_certificate.pem")
self._server_signing_key_path = os.path.join(self.settings_dir, "server_signing_key.pub")
self._identity_lock_path = os.path.join(self.settings_dir, "identity.lock")
self._installer_cache_path = os.path.join(self.settings_dir, "installer_code.shared.json")
# ------------------------------------------------------------------
# Identity management
@@ -455,3 +456,107 @@ class AgentKeyStore:
if isinstance(value, str) and value.strip():
return value.strip()
return None
# ------------------------------------------------------------------
# Installer code sharing helpers
# ------------------------------------------------------------------
def _load_installer_cache(self) -> dict:
if not os.path.isfile(self._installer_cache_path):
return {}
try:
with open(self._installer_cache_path, "r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, dict):
return data
except Exception:
pass
return {}
def _store_installer_cache(self, payload: dict) -> None:
try:
with open(self._installer_cache_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2)
_restrict_permissions(self._installer_cache_path)
except Exception:
pass
def cache_installer_code(self, code: str, consumer: Optional[str] = None) -> None:
normalized = (code or "").strip()
if not normalized:
return
payload = self._load_installer_cache()
payload["code"] = normalized
consumers = set()
existing = payload.get("consumed")
if isinstance(existing, list):
consumers = {str(item).upper() for item in existing if isinstance(item, str)}
if consumer:
consumers.add(str(consumer).upper())
payload["consumed"] = sorted(consumers)
payload["updated_at"] = int(time.time())
self._store_installer_cache(payload)
def load_cached_installer_code(self) -> Optional[str]:
payload = self._load_installer_cache()
code = payload.get("code")
if isinstance(code, str):
stripped = code.strip()
if stripped:
return stripped
return None
def mark_installer_code_consumed(self, consumer: Optional[str] = None) -> None:
payload = self._load_installer_cache()
if not payload:
return
consumers = set()
existing = payload.get("consumed")
if isinstance(existing, list):
consumers = {str(item).upper() for item in existing if isinstance(item, str)}
if consumer:
consumers.add(str(consumer).upper())
payload["consumed"] = sorted(consumers)
payload["updated_at"] = int(time.time())
code_present = isinstance(payload.get("code"), str) and payload["code"].strip()
should_clear = False
if not code_present:
should_clear = True
else:
required_consumers = {"SYSTEM", "CURRENTUSER"}
if required_consumers.issubset(consumers):
should_clear = True
else:
remaining = required_consumers - consumers
if not remaining:
should_clear = True
else:
exists_other = False
for other in remaining:
if other == "SYSTEM":
cfg_name = "agent_settings_SYSTEM.json"
elif other == "CURRENTUSER":
cfg_name = "agent_settings_CURRENTUSER.json"
else:
cfg_name = None
if not cfg_name:
continue
path = os.path.join(self.settings_dir, cfg_name)
if os.path.isfile(path):
exists_other = True
break
if not exists_other:
should_clear = True
if should_clear:
payload.pop("code", None)
payload["consumed"] = []
if payload.get("code") or payload.get("consumed"):
self._store_installer_cache(payload)
else:
try:
if os.path.isfile(self._installer_cache_path):
os.remove(self._installer_cache_path)
except Exception:
pass