feat: add agent enrollment endpoints and nonce protections

This commit is contained in:
2025-10-17 17:00:11 -06:00
parent f2722a75af
commit 78a5d3d7f9
3 changed files with 530 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
"""
Short-lived nonce cache to defend against replay attacks during enrollment.
"""
from __future__ import annotations
import time
from threading import Lock
from typing import Dict
class NonceCache:
def __init__(self, ttl_seconds: float = 300.0) -> None:
self._ttl = ttl_seconds
self._entries: Dict[str, float] = {}
self._lock = Lock()
def consume(self, key: str) -> bool:
"""
Attempt to consume the nonce identified by `key`.
Returns True on first use within TTL, False if already consumed.
"""
now = time.monotonic()
with self._lock:
expire_at = self._entries.get(key)
if expire_at and expire_at > now:
return False
self._entries[key] = now + self._ttl
# Opportunistic cleanup to keep the dict small
stale = [nonce for nonce, expiry in self._entries.items() if expiry <= now]
for nonce in stale:
self._entries.pop(nonce, None)
return True