Files
Borealis-Github-Replica/Data/Engine/domain/enrollment_admin.py

207 lines
7.3 KiB
Python

"""Administrative enrollment domain models."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Mapping, Optional
from Data.Engine.domain.device_auth import DeviceGuid, normalize_guid
__all__ = [
"EnrollmentCodeRecord",
"DeviceApprovalRecord",
"HostnameConflict",
]
def _parse_iso8601(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
raw = str(value).strip()
if not raw:
return None
try:
dt = datetime.fromisoformat(raw)
except Exception as exc: # pragma: no cover - defensive parsing
raise ValueError(f"invalid ISO8601 timestamp: {raw}") from exc
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _isoformat(value: Optional[datetime]) -> Optional[str]:
if value is None:
return None
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc).isoformat()
@dataclass(frozen=True, slots=True)
class EnrollmentCodeRecord:
"""Installer code metadata exposed to administrative clients."""
record_id: str
code: str
expires_at: datetime
max_uses: int
use_count: int
created_by_user_id: Optional[str]
used_at: Optional[datetime]
used_by_guid: Optional[DeviceGuid]
last_used_at: Optional[datetime]
@classmethod
def from_row(cls, row: Mapping[str, Any]) -> "EnrollmentCodeRecord":
record_id = str(row.get("id") or "").strip()
code = str(row.get("code") or "").strip()
if not record_id or not code:
raise ValueError("invalid enrollment install code record")
used_by = row.get("used_by_guid")
used_by_guid = DeviceGuid(str(used_by)) if used_by else None
return cls(
record_id=record_id,
code=code,
expires_at=_parse_iso8601(row.get("expires_at")) or datetime.now(tz=timezone.utc),
max_uses=int(row.get("max_uses") or 1),
use_count=int(row.get("use_count") or 0),
created_by_user_id=str(row.get("created_by_user_id") or "").strip() or None,
used_at=_parse_iso8601(row.get("used_at")),
used_by_guid=used_by_guid,
last_used_at=_parse_iso8601(row.get("last_used_at")),
)
def status(self, *, now: Optional[datetime] = None) -> str:
reference = now or datetime.now(tz=timezone.utc)
if self.use_count >= self.max_uses:
return "used"
if self.expires_at <= reference:
return "expired"
return "active"
def to_dict(self) -> dict[str, Any]:
return {
"id": self.record_id,
"code": self.code,
"expires_at": _isoformat(self.expires_at),
"max_uses": self.max_uses,
"use_count": self.use_count,
"created_by_user_id": self.created_by_user_id,
"used_at": _isoformat(self.used_at),
"used_by_guid": self.used_by_guid.value if self.used_by_guid else None,
"last_used_at": _isoformat(self.last_used_at),
"status": self.status(),
}
@dataclass(frozen=True, slots=True)
class HostnameConflict:
"""Existing device details colliding with a pending approval."""
guid: Optional[str]
ssl_key_fingerprint: Optional[str]
site_id: Optional[int]
site_name: str
fingerprint_match: bool
requires_prompt: bool
def to_dict(self) -> dict[str, Any]:
return {
"guid": self.guid,
"ssl_key_fingerprint": self.ssl_key_fingerprint,
"site_id": self.site_id,
"site_name": self.site_name,
"fingerprint_match": self.fingerprint_match,
"requires_prompt": self.requires_prompt,
}
@dataclass(frozen=True, slots=True)
class DeviceApprovalRecord:
"""Administrative projection of a device approval entry."""
record_id: str
reference: str
status: str
claimed_hostname: str
claimed_fingerprint: str
created_at: datetime
updated_at: datetime
enrollment_code_id: Optional[str]
guid: Optional[str]
approved_by_user_id: Optional[str]
approved_by_username: Optional[str]
client_nonce: str
server_nonce: str
hostname_conflict: Optional[HostnameConflict]
alternate_hostname: Optional[str]
conflict_requires_prompt: bool
fingerprint_match: bool
@classmethod
def from_row(
cls,
row: Mapping[str, Any],
*,
conflict: Optional[HostnameConflict] = None,
alternate_hostname: Optional[str] = None,
fingerprint_match: bool = False,
requires_prompt: bool = False,
) -> "DeviceApprovalRecord":
record_id = str(row.get("id") or "").strip()
reference = str(row.get("approval_reference") or "").strip()
hostname = str(row.get("hostname_claimed") or "").strip()
fingerprint = str(row.get("ssl_key_fingerprint_claimed") or "").strip().lower()
if not record_id or not reference or not hostname or not fingerprint:
raise ValueError("invalid device approval record")
guid_raw = normalize_guid(row.get("guid")) or None
return cls(
record_id=record_id,
reference=reference,
status=str(row.get("status") or "pending").strip().lower(),
claimed_hostname=hostname,
claimed_fingerprint=fingerprint,
created_at=_parse_iso8601(row.get("created_at")) or datetime.now(tz=timezone.utc),
updated_at=_parse_iso8601(row.get("updated_at")) or datetime.now(tz=timezone.utc),
enrollment_code_id=str(row.get("enrollment_code_id") or "").strip() or None,
guid=guid_raw,
approved_by_user_id=str(row.get("approved_by_user_id") or "").strip() or None,
approved_by_username=str(row.get("approved_by_username") or "").strip() or None,
client_nonce=str(row.get("client_nonce") or "").strip(),
server_nonce=str(row.get("server_nonce") or "").strip(),
hostname_conflict=conflict,
alternate_hostname=alternate_hostname,
conflict_requires_prompt=requires_prompt,
fingerprint_match=fingerprint_match,
)
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"id": self.record_id,
"approval_reference": self.reference,
"status": self.status,
"hostname_claimed": self.claimed_hostname,
"ssl_key_fingerprint_claimed": self.claimed_fingerprint,
"created_at": _isoformat(self.created_at),
"updated_at": _isoformat(self.updated_at),
"enrollment_code_id": self.enrollment_code_id,
"guid": self.guid,
"approved_by_user_id": self.approved_by_user_id,
"approved_by_username": self.approved_by_username,
"client_nonce": self.client_nonce,
"server_nonce": self.server_nonce,
"conflict_requires_prompt": self.conflict_requires_prompt,
"fingerprint_match": self.fingerprint_match,
}
if self.hostname_conflict is not None:
payload["hostname_conflict"] = self.hostname_conflict.to_dict()
if self.alternate_hostname:
payload["alternate_hostname"] = self.alternate_hostname
return payload