Files
Borealis-Github-Replica/Data/Engine/services/enrollment/admin_service.py

114 lines
3.8 KiB
Python

"""Administrative helpers for enrollment workflows."""
from __future__ import annotations
import logging
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Optional
from Data.Engine.domain.enrollment_admin import DeviceApprovalRecord, EnrollmentCodeRecord
from Data.Engine.repositories.sqlite.enrollment_repository import SQLiteEnrollmentRepository
from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository
__all__ = ["EnrollmentAdminService"]
class EnrollmentAdminService:
"""Expose administrative enrollment operations."""
_VALID_TTL_HOURS = {1, 3, 6, 12, 24}
def __init__(
self,
*,
repository: SQLiteEnrollmentRepository,
user_repository: SQLiteUserRepository,
logger: Optional[logging.Logger] = None,
clock: Optional[Callable[[], datetime]] = None,
) -> None:
self._repository = repository
self._users = user_repository
self._log = logger or logging.getLogger("borealis.engine.services.enrollment_admin")
self._clock = clock or (lambda: datetime.now(tz=timezone.utc))
# ------------------------------------------------------------------
# Enrollment install codes
# ------------------------------------------------------------------
def list_install_codes(self, *, status: Optional[str] = None) -> List[EnrollmentCodeRecord]:
return self._repository.list_install_codes(status=status, now=self._clock())
def create_install_code(
self,
*,
ttl_hours: int,
max_uses: int,
created_by: Optional[str],
) -> EnrollmentCodeRecord:
if ttl_hours not in self._VALID_TTL_HOURS:
raise ValueError("invalid_ttl")
normalized_max = self._normalize_max_uses(max_uses)
now = self._clock()
expires_at = now + timedelta(hours=ttl_hours)
record_id = str(uuid.uuid4())
code = self._generate_install_code()
created_by_identifier = None
if created_by:
created_by_identifier = self._users.resolve_identifier(created_by)
if not created_by_identifier:
created_by_identifier = created_by.strip() or None
record = self._repository.insert_install_code(
record_id=record_id,
code=code,
expires_at=expires_at,
created_by=created_by_identifier,
max_uses=normalized_max,
)
self._log.info(
"install code created id=%s ttl=%sh max_uses=%s",
record.record_id,
ttl_hours,
normalized_max,
)
return record
def delete_install_code(self, record_id: str) -> bool:
deleted = self._repository.delete_install_code_if_unused(record_id)
if deleted:
self._log.info("install code deleted id=%s", record_id)
return deleted
# ------------------------------------------------------------------
# Device approvals
# ------------------------------------------------------------------
def list_device_approvals(self, *, status: Optional[str] = None) -> List[DeviceApprovalRecord]:
return self._repository.list_device_approvals(status=status)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _generate_install_code() -> str:
raw = secrets.token_hex(16).upper()
return "-".join(raw[i : i + 4] for i in range(0, len(raw), 4))
@staticmethod
def _normalize_max_uses(value: int) -> int:
try:
count = int(value)
except Exception:
count = 2
if count < 1:
return 1
if count > 10:
return 10
return count