Files
Borealis-Github-Replica/Data/Engine/builders/device_enrollment.py

132 lines
4.9 KiB
Python

"""Builder utilities for device enrollment payloads."""
from __future__ import annotations
import base64
from dataclasses import dataclass
from typing import Optional
from Data.Engine.domain.device_auth import DeviceFingerprint, sanitize_service_context
from Data.Engine.domain.device_enrollment import (
EnrollmentValidationError,
ProofChallenge,
)
from Data.Engine.integrations.crypto import keys as crypto_keys
__all__ = [
"EnrollmentRequestBuilder",
"EnrollmentRequestInput",
"ProofChallengeBuilder",
]
@dataclass(frozen=True, slots=True)
class EnrollmentRequestInput:
"""Structured enrollment request payload ready for the service layer."""
hostname: str
enrollment_code: str
fingerprint: DeviceFingerprint
client_nonce: bytes
client_nonce_b64: str
agent_public_key_der: bytes
service_context: Optional[str]
class EnrollmentRequestBuilder:
"""Normalize agent enrollment JSON payloads into domain objects."""
def __init__(self) -> None:
self._hostname: Optional[str] = None
self._enrollment_code: Optional[str] = None
self._agent_pubkey_b64: Optional[str] = None
self._client_nonce_b64: Optional[str] = None
self._service_context: Optional[str] = None
def with_payload(self, payload: Optional[dict[str, object]]) -> "EnrollmentRequestBuilder":
payload = payload or {}
self._hostname = str(payload.get("hostname") or "").strip()
self._enrollment_code = str(payload.get("enrollment_code") or "").strip()
agent_pubkey = payload.get("agent_pubkey")
self._agent_pubkey_b64 = agent_pubkey if isinstance(agent_pubkey, str) else None
client_nonce = payload.get("client_nonce")
self._client_nonce_b64 = client_nonce if isinstance(client_nonce, str) else None
return self
def with_service_context(self, value: Optional[str]) -> "EnrollmentRequestBuilder":
self._service_context = value
return self
def build(self) -> EnrollmentRequestInput:
if not self._hostname:
raise EnrollmentValidationError("hostname_required")
if not self._enrollment_code:
raise EnrollmentValidationError("enrollment_code_required")
if not self._agent_pubkey_b64:
raise EnrollmentValidationError("agent_pubkey_required")
if not self._client_nonce_b64:
raise EnrollmentValidationError("client_nonce_required")
try:
agent_pubkey_der = crypto_keys.spki_der_from_base64(self._agent_pubkey_b64)
except Exception as exc: # pragma: no cover - invalid input path
raise EnrollmentValidationError("invalid_agent_pubkey") from exc
if len(agent_pubkey_der) < 10:
raise EnrollmentValidationError("invalid_agent_pubkey")
try:
client_nonce_bytes = base64.b64decode(self._client_nonce_b64, validate=True)
except Exception as exc: # pragma: no cover - invalid input path
raise EnrollmentValidationError("invalid_client_nonce") from exc
if len(client_nonce_bytes) < 16:
raise EnrollmentValidationError("invalid_client_nonce")
fingerprint_value = crypto_keys.fingerprint_from_spki_der(agent_pubkey_der)
return EnrollmentRequestInput(
hostname=self._hostname,
enrollment_code=self._enrollment_code,
fingerprint=DeviceFingerprint(fingerprint_value),
client_nonce=client_nonce_bytes,
client_nonce_b64=self._client_nonce_b64,
agent_public_key_der=agent_pubkey_der,
service_context=sanitize_service_context(self._service_context),
)
class ProofChallengeBuilder:
"""Construct proof challenges during enrollment approval."""
def __init__(self) -> None:
self._server_nonce: Optional[bytes] = None
self._client_nonce: Optional[bytes] = None
self._fingerprint: Optional[DeviceFingerprint] = None
def with_server_nonce(self, nonce: Optional[bytes]) -> "ProofChallengeBuilder":
self._server_nonce = bytes(nonce or b"")
return self
def with_client_nonce(self, nonce: Optional[bytes]) -> "ProofChallengeBuilder":
self._client_nonce = bytes(nonce or b"")
return self
def with_fingerprint(self, fingerprint: Optional[str]) -> "ProofChallengeBuilder":
if fingerprint:
self._fingerprint = DeviceFingerprint(fingerprint)
else:
self._fingerprint = None
return self
def build(self) -> ProofChallenge:
if self._server_nonce is None or self._client_nonce is None:
raise ValueError("both server and client nonces are required")
if not self._fingerprint:
raise ValueError("fingerprint is required")
return ProofChallenge(
client_nonce=self._client_nonce,
server_nonce=self._server_nonce,
fingerprint=self._fingerprint,
)