diff --git a/Data/Agent/agent.py b/Data/Agent/agent.py index d20c1b8..ac3d0ff 100644 --- a/Data/Agent/agent.py +++ b/Data/Agent/agent.py @@ -360,6 +360,21 @@ class _CrossProcessFileLock: pass +_GUID_FILE_LOCK: Optional[_CrossProcessFileLock] = None + + +@contextlib.contextmanager +def _acquire_guid_lock(*, timeout: float = 60.0): + global _GUID_FILE_LOCK + if _GUID_FILE_LOCK is None: + _GUID_FILE_LOCK = _CrossProcessFileLock(os.path.join(_settings_dir(), 'agent_guid.lock')) + _GUID_FILE_LOCK.acquire(timeout=timeout) + try: + yield + finally: + _GUID_FILE_LOCK.release() + + _ENROLLMENT_FILE_LOCK: Optional[_CrossProcessFileLock] = None @@ -387,30 +402,60 @@ def _key_store() -> AgentKeyStore: def _persist_agent_guid_local(guid: str): - guid = _normalize_agent_guid(guid) - if not guid: + _persist_agent_guid_local_internal(guid, assume_locked=False) + + +def _persist_agent_guid_local_internal(guid: str, *, assume_locked: bool) -> None: + normalized = _normalize_agent_guid(guid) + if not normalized: return - try: - _key_store().save_guid(guid) - except Exception as exc: - _log_agent(f'Unable to persist guid via key store: {exc}', fname='agent.error.log') - path = _agent_guid_path() - try: - directory = os.path.dirname(path) - if directory: - os.makedirs(directory, exist_ok=True) - existing = '' - if os.path.isfile(path): + + def _write(): + try: + _key_store().save_guid(normalized) + except Exception as exc: + _log_agent(f'Unable to persist guid via key store: {exc}', fname='agent.error.log') + path = _agent_guid_path() + try: + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + existing = '' + if os.path.isfile(path): + try: + with open(path, 'r', encoding='utf-8') as fh: + existing = fh.read().strip() + except Exception: + existing = '' + if existing != normalized: + with open(path, 'w', encoding='utf-8') as fh: + fh.write(normalized) + except Exception as exc: + _log_agent(f'Failed to persist agent GUID locally: {exc}', fname='agent.error.log') + + legacy_paths: List[str] = [] + try: + root = _find_project_root() + legacy_paths.extend( + [ + os.path.join(root, 'Agent', 'Borealis', 'agent_GUID'), + os.path.join(root, 'Agent', 'Settings', 'agent_GUID'), + ] + ) + except Exception: + pass + for legacy in legacy_paths: try: - with open(path, 'r', encoding='utf-8') as fh: - existing = fh.read().strip() + if legacy and os.path.isfile(legacy) and os.path.abspath(legacy) != os.path.abspath(path): + os.remove(legacy) except Exception: - existing = '' - if existing != guid: - with open(path, 'w', encoding='utf-8') as fh: - fh.write(guid) - except Exception as exc: - _log_agent(f'Failed to persist agent GUID locally: {exc}', fname='agent.error.log') + pass + + if assume_locked: + _write() + else: + with _acquire_guid_lock(): + _write() if not SYSTEM_SERVICE_MODE: # Reduce noisy Qt output and attempt to avoid Windows OleInitialize warnings @@ -834,6 +879,7 @@ class AgentHttpClient: self._cached_ssl_context: Optional[ssl.SSLContext] = None self._socketio_http_session = None self._socketio_session_mode: Optional[Tuple[str, Optional[str]]] = None + self._last_reload_state: Optional[Tuple[Optional[str], bool, bool, Optional[int]]] = None self.refresh_base_url() self._configure_verify() self._reload_tokens_from_disk() @@ -878,6 +924,7 @@ class AgentHttpClient: self.key_store.save_guid(normalized_guid) except Exception: pass + prev_state = self._last_reload_state self.guid = normalized_guid or None self.access_token = access_token if access_token else None self.refresh_token = refresh_token if refresh_token else None @@ -886,17 +933,28 @@ class AgentHttpClient: self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) else: self.session.headers.pop("Authorization", None) - try: - _log_agent( - "Reloaded tokens from disk " - f"guid={'yes' if self.guid else 'no'} " - f"access={'yes' if self.access_token else 'no'} " - f"refresh={'yes' if self.refresh_token else 'no'} " - f"expiry={self.access_expires_at}", - fname="agent.log", - ) - except Exception: - pass + if self.guid: + desired = _compose_agent_id(socket.gethostname(), self.guid, _get_context_label()) + existing = (CONFIG.data.get('agent_id') or '').strip() + if desired and existing != desired: + try: + _update_agent_id_for_guid(self.guid) + except Exception: + pass + state = (self.guid, bool(self.access_token), bool(self.refresh_token), self.access_expires_at) + if state != prev_state: + try: + _log_agent( + "Reloaded tokens from disk " + f"guid={'yes' if self.guid else 'no'} " + f"access={'yes' if self.access_token else 'no'} " + f"refresh={'yes' if self.refresh_token else 'no'} " + f"expiry={self.access_expires_at}", + fname="agent.log", + ) + except Exception: + pass + self._last_reload_state = state def auth_headers(self) -> Dict[str, str]: headers: Dict[str, str] = {} @@ -1181,6 +1239,7 @@ class AgentHttpClient: self._ensure_authenticated_locked() def _ensure_authenticated_locked(self) -> None: + self._reload_tokens_from_disk() self.refresh_base_url() if not self.guid or not self.refresh_token: self._perform_enrollment_locked() @@ -1447,6 +1506,18 @@ class AgentHttpClient: ) if resp.status_code in (401, 403): error_code, snippet = self._error_details(resp) + if resp.status_code == 403 and error_code == 'guid_mismatch': + try: + _log_agent( + "Refresh token request saw guid mismatch; reloading credentials from disk", + fname="agent.log", + ) + except Exception: + pass + self._reload_tokens_from_disk() + if self.access_token: + self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) + return if resp.status_code == 401 and self._should_retry_auth(resp.status_code, error_code): _log_agent( "Refresh token rejected; attempting re-enrollment" @@ -1585,29 +1656,39 @@ class AgentHttpClient: # HTTP helpers # ------------------------------------------------------------------ def post_json(self, path: str, payload: Optional[Dict[str, Any]] = None, *, require_auth: bool = True) -> Any: - if require_auth: - self.ensure_authenticated() - url = f"{self.base_url}{path}" - headers = self.auth_headers() - response = self.session.post(url, json=payload, headers=headers, timeout=30) - if response.status_code in (401, 403) and require_auth: - error_code, snippet = self._error_details(response) - if self._should_retry_auth(response.status_code, error_code): - self.clear_tokens() + attempt = 0 + max_attempts = 3 + while True: + if require_auth: self.ensure_authenticated() - headers = self.auth_headers() - response = self.session.post(url, json=payload, headers=headers, timeout=30) - else: + url = f"{self.base_url}{path}" + headers = self.auth_headers() + response = self.session.post(url, json=payload, headers=headers, timeout=30) + if require_auth and response.status_code in (401, 403): + error_code, snippet = self._error_details(response) + if response.status_code == 403 and error_code == 'guid_mismatch' and attempt < max_attempts: + attempt += 1 + self._reload_tokens_from_disk() + if self.guid: + try: + _update_agent_id_for_guid(self.guid) + except Exception: + pass + continue + if self._should_retry_auth(response.status_code, error_code) and attempt < max_attempts: + self.clear_tokens() + attempt += 1 + continue _log_agent( "Authenticated request rejected " f"path={path} status={response.status_code} error={error_code or ''}" f" body_snippet={snippet}", fname="agent.error.log", ) - response.raise_for_status() - if response.headers.get("Content-Type", "").lower().startswith("application/json"): - return response.json() - return response.text + response.raise_for_status() + if response.headers.get("Content-Type", "").lower().startswith("application/json"): + return response.json() + return response.text async def async_post_json( self, @@ -1675,26 +1756,51 @@ def _normalize_agent_guid(guid: str) -> str: def _read_agent_guid_from_disk() -> str: try: - ks_guid = _key_store().load_guid() - if ks_guid: - return _normalize_agent_guid(ks_guid) path = _agent_guid_path() - if os.path.isfile(path): - with open(path, 'r', encoding='utf-8') as fh: - value = fh.read() - return _normalize_agent_guid(value) + candidates = [path] + try: + root = _find_project_root() + legacy_candidates = [ + os.path.join(root, 'Agent', 'Borealis', 'agent_GUID'), + os.path.join(root, 'Agent', 'Settings', 'agent_GUID'), + ] + for candidate in legacy_candidates: + if candidate not in candidates: + candidates.append(candidate) + except Exception: + pass + for candidate in candidates: + if os.path.isfile(candidate): + try: + with open(candidate, 'r', encoding='utf-8') as fh: + value = fh.read() + except Exception: + value = '' + guid = _normalize_agent_guid(value) + if guid: + return guid except Exception: pass return '' def _ensure_agent_guid() -> str: - guid = _read_agent_guid_from_disk() - if guid: + with _acquire_guid_lock(): + guid = _read_agent_guid_from_disk() + if not guid: + try: + ks_guid = _key_store().load_guid() + except Exception: + ks_guid = None + guid = _normalize_agent_guid(ks_guid or '') + if not guid: + guid = str(uuid.uuid4()).upper() + _persist_agent_guid_local_internal(guid, assume_locked=True) + try: + _update_agent_id_for_guid(guid) + except Exception: + pass return guid - new_guid = str(uuid.uuid4()).upper() - _persist_agent_guid_local(new_guid) - return new_guid def _compose_agent_id(hostname: str, guid: str, context: str) -> str: