diff --git a/Data/Engine/interfaces/http/__init__.py b/Data/Engine/interfaces/http/__init__.py index e388b81..fc88e26 100644 --- a/Data/Engine/interfaces/http/__init__.py +++ b/Data/Engine/interfaces/http/__init__.py @@ -6,7 +6,7 @@ from flask import Flask from Data.Engine.services.container import EngineServiceContainer -from . import admin, agents, auth, enrollment, github, health, job_management, tokens +from . import admin, agents, auth, enrollment, github, health, job_management, tokens, users _REGISTRARS = ( health.register, @@ -17,6 +17,7 @@ _REGISTRARS = ( github.register, auth.register, admin.register, + users.register, ) diff --git a/Data/Engine/interfaces/http/users.py b/Data/Engine/interfaces/http/users.py new file mode 100644 index 0000000..ae1d350 --- /dev/null +++ b/Data/Engine/interfaces/http/users.py @@ -0,0 +1,185 @@ +"""HTTP endpoints for operator account management.""" + +from __future__ import annotations + +from flask import Blueprint, Flask, jsonify, request, session + +from Data.Engine.services.auth import ( + AccountNotFoundError, + CannotModifySelfError, + InvalidPasswordHashError, + InvalidRoleError, + LastAdminError, + LastUserError, + OperatorAccountService, + UsernameAlreadyExistsError, +) +from Data.Engine.services.container import EngineServiceContainer + +blueprint = Blueprint("engine_users", __name__) + + +def register(app: Flask, services: EngineServiceContainer) -> None: + blueprint.services = services # type: ignore[attr-defined] + app.register_blueprint(blueprint) + + +def _services() -> EngineServiceContainer: + svc = getattr(blueprint, "services", None) + if svc is None: # pragma: no cover - defensive + raise RuntimeError("user blueprint not initialized") + return svc + + +def _accounts() -> OperatorAccountService: + return _services().operator_account_service + + +def _require_admin(): + username = session.get("username") + role = (session.get("role") or "").strip().lower() + if not isinstance(username, str) or not username: + return jsonify({"error": "not_authenticated"}), 401 + if role != "admin": + return jsonify({"error": "forbidden"}), 403 + return None + + +def _format_user(record) -> dict[str, object]: + return { + "username": record.username, + "display_name": record.display_name, + "role": record.role, + "last_login": record.last_login, + "created_at": record.created_at, + "updated_at": record.updated_at, + "mfa_enabled": 1 if record.mfa_enabled else 0, + } + + +@blueprint.route("/api/users", methods=["GET"]) +def list_users() -> object: + guard = _require_admin() + if guard: + return guard + + records = _accounts().list_accounts() + return jsonify({"users": [_format_user(record) for record in records]}) + + +@blueprint.route("/api/users", methods=["POST"]) +def create_user() -> object: + guard = _require_admin() + if guard: + return guard + + payload = request.get_json(silent=True) or {} + username = str(payload.get("username") or "").strip() + password_sha512 = str(payload.get("password_sha512") or "").strip() + role = str(payload.get("role") or "User") + display_name = str(payload.get("display_name") or username) + + try: + _accounts().create_account( + username=username, + password_sha512=password_sha512, + role=role, + display_name=display_name, + ) + except UsernameAlreadyExistsError as exc: + return jsonify({"error": str(exc)}), 409 + except (InvalidPasswordHashError, InvalidRoleError) as exc: + return jsonify({"error": str(exc)}), 400 + + return jsonify({"status": "ok"}) + + +@blueprint.route("/api/users/", methods=["DELETE"]) +def delete_user(username: str) -> object: + guard = _require_admin() + if guard: + return guard + + actor = session.get("username") if isinstance(session.get("username"), str) else None + + try: + _accounts().delete_account(username, actor=actor) + except CannotModifySelfError as exc: + return jsonify({"error": str(exc)}), 400 + except LastUserError as exc: + return jsonify({"error": str(exc)}), 400 + except LastAdminError as exc: + return jsonify({"error": str(exc)}), 400 + except AccountNotFoundError as exc: + return jsonify({"error": str(exc)}), 404 + + return jsonify({"status": "ok"}) + + +@blueprint.route("/api/users//reset_password", methods=["POST"]) +def reset_password(username: str) -> object: + guard = _require_admin() + if guard: + return guard + + payload = request.get_json(silent=True) or {} + password_sha512 = str(payload.get("password_sha512") or "").strip() + + try: + _accounts().reset_password(username, password_sha512) + except InvalidPasswordHashError as exc: + return jsonify({"error": str(exc)}), 400 + except AccountNotFoundError as exc: + return jsonify({"error": str(exc)}), 404 + + return jsonify({"status": "ok"}) + + +@blueprint.route("/api/users//role", methods=["POST"]) +def change_role(username: str) -> object: + guard = _require_admin() + if guard: + return guard + + payload = request.get_json(silent=True) or {} + role = str(payload.get("role") or "").strip() + actor = session.get("username") if isinstance(session.get("username"), str) else None + + try: + record = _accounts().change_role(username, role, actor=actor) + except InvalidRoleError as exc: + return jsonify({"error": str(exc)}), 400 + except LastAdminError as exc: + return jsonify({"error": str(exc)}), 400 + except AccountNotFoundError as exc: + return jsonify({"error": str(exc)}), 404 + + if actor and actor.strip().lower() == username.strip().lower(): + session["role"] = record.role + + return jsonify({"status": "ok"}) + + +@blueprint.route("/api/users//mfa", methods=["POST"]) +def update_mfa(username: str) -> object: + guard = _require_admin() + if guard: + return guard + + payload = request.get_json(silent=True) or {} + enabled = bool(payload.get("enabled", False)) + reset_secret = bool(payload.get("reset_secret", False)) + + try: + _accounts().update_mfa(username, enabled=enabled, reset_secret=reset_secret) + except AccountNotFoundError as exc: + return jsonify({"error": str(exc)}), 404 + + actor = session.get("username") if isinstance(session.get("username"), str) else None + if actor and actor.strip().lower() == username.strip().lower() and not enabled: + session.pop("mfa_pending", None) + + return jsonify({"status": "ok"}) + + +__all__ = ["register", "blueprint"] diff --git a/Data/Engine/repositories/sqlite/user_repository.py b/Data/Engine/repositories/sqlite/user_repository.py index 14708e5..9c61a4d 100644 --- a/Data/Engine/repositories/sqlite/user_repository.py +++ b/Data/Engine/repositories/sqlite/user_repository.py @@ -5,7 +5,7 @@ from __future__ import annotations import logging import sqlite3 from dataclasses import dataclass -from typing import Optional +from typing import Iterable, Optional from Data.Engine.domain import OperatorAccount @@ -64,23 +64,175 @@ class SQLiteUserRepository: if not row: return None record = _UserRow(*row) - return OperatorAccount( - username=record.username, - display_name=record.display_name or record.username, - password_sha512=(record.password_sha512 or "").lower(), - role=record.role or "User", - last_login=int(record.last_login or 0), - created_at=int(record.created_at or 0), - updated_at=int(record.updated_at or 0), - mfa_enabled=bool(record.mfa_enabled), - mfa_secret=(record.mfa_secret or "") or None, - ) + return _row_to_account(record) except sqlite3.Error as exc: # pragma: no cover - defensive self._log.error("failed to load user %s: %s", username, exc) return None finally: conn.close() + def list_accounts(self) -> list[OperatorAccount]: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute( + """ + SELECT + id, + username, + display_name, + COALESCE(password_sha512, '') as password_sha512, + COALESCE(role, 'User') as role, + COALESCE(last_login, 0) as last_login, + COALESCE(created_at, 0) as created_at, + COALESCE(updated_at, 0) as updated_at, + COALESCE(mfa_enabled, 0) as mfa_enabled, + COALESCE(mfa_secret, '') as mfa_secret + FROM users + ORDER BY LOWER(username) ASC + """ + ) + rows = [_UserRow(*row) for row in cur.fetchall()] + return [_row_to_account(row) for row in rows] + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("failed to enumerate users: %s", exc) + return [] + finally: + conn.close() + + def create_account( + self, + *, + username: str, + display_name: str, + password_sha512: str, + role: str, + timestamp: int, + ) -> None: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO users ( + username, + display_name, + password_sha512, + role, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + (username, display_name, password_sha512, role, timestamp, timestamp), + ) + conn.commit() + finally: + conn.close() + + def delete_account(self, username: str) -> bool: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute("DELETE FROM users WHERE LOWER(username) = LOWER(?)", (username,)) + deleted = cur.rowcount > 0 + conn.commit() + return deleted + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("failed to delete user %s: %s", username, exc) + return False + finally: + conn.close() + + def update_password(self, username: str, password_sha512: str, *, timestamp: int) -> bool: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute( + """ + UPDATE users + SET password_sha512 = ?, + updated_at = ? + WHERE LOWER(username) = LOWER(?) + """, + (password_sha512, timestamp, username), + ) + conn.commit() + return cur.rowcount > 0 + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("failed to update password for %s: %s", username, exc) + return False + finally: + conn.close() + + def update_role(self, username: str, role: str, *, timestamp: int) -> bool: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute( + """ + UPDATE users + SET role = ?, + updated_at = ? + WHERE LOWER(username) = LOWER(?) + """, + (role, timestamp, username), + ) + conn.commit() + return cur.rowcount > 0 + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("failed to update role for %s: %s", username, exc) + return False + finally: + conn.close() + + def update_mfa( + self, + username: str, + *, + enabled: bool, + reset_secret: bool, + timestamp: int, + ) -> bool: + conn = self._connection_factory() + try: + cur = conn.cursor() + secret_clause = "mfa_secret = NULL" if reset_secret else None + assignments: list[str] = ["mfa_enabled = ?", "updated_at = ?"] + params: list[object] = [1 if enabled else 0, timestamp] + if secret_clause is not None: + assignments.append(secret_clause) + query = "UPDATE users SET " + ", ".join(assignments) + " WHERE LOWER(username) = LOWER(?)" + params.append(username) + cur.execute(query, tuple(params)) + conn.commit() + return cur.rowcount > 0 + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("failed to update MFA for %s: %s", username, exc) + return False + finally: + conn.close() + + def count_accounts(self) -> int: + return self._scalar("SELECT COUNT(*) FROM users", ()) + + def count_admins(self) -> int: + return self._scalar("SELECT COUNT(*) FROM users WHERE LOWER(role) = 'admin'", ()) + + def _scalar(self, query: str, params: Iterable[object]) -> int: + conn = self._connection_factory() + try: + cur = conn.cursor() + cur.execute(query, tuple(params)) + row = cur.fetchone() + if not row: + return 0 + return int(row[0] or 0) + except sqlite3.Error as exc: # pragma: no cover - defensive + self._log.error("scalar query failed: %s", exc) + return 0 + finally: + conn.close() + def update_last_login(self, username: str, timestamp: int) -> None: conn = self._connection_factory() try: @@ -121,3 +273,17 @@ class SQLiteUserRepository: __all__ = ["SQLiteUserRepository"] + + +def _row_to_account(record: _UserRow) -> OperatorAccount: + return OperatorAccount( + username=record.username, + display_name=record.display_name or record.username, + password_sha512=(record.password_sha512 or "").lower(), + role=record.role or "User", + last_login=int(record.last_login or 0), + created_at=int(record.created_at or 0), + updated_at=int(record.updated_at or 0), + mfa_enabled=bool(record.mfa_enabled), + mfa_secret=(record.mfa_secret or "") or None, + ) diff --git a/Data/Engine/services/auth/__init__.py b/Data/Engine/services/auth/__init__.py index 98e66cd..103eb65 100644 --- a/Data/Engine/services/auth/__init__.py +++ b/Data/Engine/services/auth/__init__.py @@ -11,6 +11,18 @@ from .token_service import ( TokenRefreshErrorCode, TokenService, ) +from .operator_account_service import ( + AccountNotFoundError, + CannotModifySelfError, + InvalidPasswordHashError, + InvalidRoleError, + LastAdminError, + LastUserError, + OperatorAccountError, + OperatorAccountRecord, + OperatorAccountService, + UsernameAlreadyExistsError, +) from .operator_auth_service import ( InvalidCredentialsError, InvalidMFACodeError, @@ -32,6 +44,16 @@ __all__ = [ "TokenRefreshError", "TokenRefreshErrorCode", "TokenService", + "OperatorAccountService", + "OperatorAccountError", + "OperatorAccountRecord", + "UsernameAlreadyExistsError", + "AccountNotFoundError", + "LastAdminError", + "LastUserError", + "CannotModifySelfError", + "InvalidRoleError", + "InvalidPasswordHashError", "OperatorAuthService", "OperatorAuthError", "InvalidCredentialsError", diff --git a/Data/Engine/services/auth/operator_account_service.py b/Data/Engine/services/auth/operator_account_service.py new file mode 100644 index 0000000..b93d27f --- /dev/null +++ b/Data/Engine/services/auth/operator_account_service.py @@ -0,0 +1,211 @@ +"""Operator account management service.""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Optional + +from Data.Engine.domain import OperatorAccount +from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository + + +class OperatorAccountError(Exception): + """Base class for operator account management failures.""" + + +class UsernameAlreadyExistsError(OperatorAccountError): + """Raised when attempting to create an operator with a duplicate username.""" + + +class AccountNotFoundError(OperatorAccountError): + """Raised when the requested operator account cannot be located.""" + + +class LastAdminError(OperatorAccountError): + """Raised when attempting to demote or delete the last remaining admin.""" + + +class LastUserError(OperatorAccountError): + """Raised when attempting to delete the final operator account.""" + + +class CannotModifySelfError(OperatorAccountError): + """Raised when the caller attempts to delete themselves.""" + + +class InvalidRoleError(OperatorAccountError): + """Raised when a role value is invalid.""" + + +class InvalidPasswordHashError(OperatorAccountError): + """Raised when a password hash is malformed.""" + + +@dataclass(frozen=True, slots=True) +class OperatorAccountRecord: + username: str + display_name: str + role: str + last_login: int + created_at: int + updated_at: int + mfa_enabled: bool + + +class OperatorAccountService: + """High-level operations for managing operator accounts.""" + + def __init__( + self, + repository: SQLiteUserRepository, + *, + logger: Optional[logging.Logger] = None, + ) -> None: + self._repository = repository + self._log = logger or logging.getLogger("borealis.engine.services.operator_accounts") + + def list_accounts(self) -> list[OperatorAccountRecord]: + return [_to_record(account) for account in self._repository.list_accounts()] + + def create_account( + self, + *, + username: str, + password_sha512: str, + role: str, + display_name: Optional[str] = None, + ) -> OperatorAccountRecord: + normalized_role = self._normalize_role(role) + username = (username or "").strip() + password_sha512 = (password_sha512 or "").strip().lower() + display_name = (display_name or username or "").strip() + + if not username or not password_sha512: + raise InvalidPasswordHashError("username and password are required") + if len(password_sha512) != 128: + raise InvalidPasswordHashError("password hash must be 128 hex characters") + + now = int(time.time()) + try: + self._repository.create_account( + username=username, + display_name=display_name or username, + password_sha512=password_sha512, + role=normalized_role, + timestamp=now, + ) + except Exception as exc: # pragma: no cover - sqlite integrity errors are deterministic + import sqlite3 + + if isinstance(exc, sqlite3.IntegrityError): + raise UsernameAlreadyExistsError("username already exists") from exc + raise + + account = self._repository.fetch_by_username(username) + if not account: # pragma: no cover - sanity guard + raise AccountNotFoundError("account creation failed") + return _to_record(account) + + def delete_account(self, username: str, *, actor: Optional[str] = None) -> None: + username = (username or "").strip() + if not username: + raise AccountNotFoundError("invalid username") + + if actor and actor.strip().lower() == username.lower(): + raise CannotModifySelfError("cannot delete yourself") + + total_accounts = self._repository.count_accounts() + if total_accounts <= 1: + raise LastUserError("cannot delete the last user") + + target = self._repository.fetch_by_username(username) + if not target: + raise AccountNotFoundError("user not found") + + if target.role.lower() == "admin" and self._repository.count_admins() <= 1: + raise LastAdminError("cannot delete the last admin") + + if not self._repository.delete_account(username): + raise AccountNotFoundError("user not found") + + def reset_password(self, username: str, password_sha512: str) -> None: + username = (username or "").strip() + password_sha512 = (password_sha512 or "").strip().lower() + if len(password_sha512) != 128: + raise InvalidPasswordHashError("invalid password hash") + + now = int(time.time()) + if not self._repository.update_password(username, password_sha512, timestamp=now): + raise AccountNotFoundError("user not found") + + def change_role(self, username: str, role: str, *, actor: Optional[str] = None) -> OperatorAccountRecord: + username = (username or "").strip() + normalized_role = self._normalize_role(role) + + account = self._repository.fetch_by_username(username) + if not account: + raise AccountNotFoundError("user not found") + + if account.role.lower() == "admin" and normalized_role.lower() != "admin": + if self._repository.count_admins() <= 1: + raise LastAdminError("cannot demote the last admin") + + now = int(time.time()) + if not self._repository.update_role(username, normalized_role, timestamp=now): + raise AccountNotFoundError("user not found") + + updated = self._repository.fetch_by_username(username) + if not updated: # pragma: no cover - guard + raise AccountNotFoundError("user not found") + + record = _to_record(updated) + if actor and actor.strip().lower() == username.lower(): + self._log.info("actor-role-updated", extra={"username": username, "role": record.role}) + return record + + def update_mfa(self, username: str, *, enabled: bool, reset_secret: bool) -> None: + username = (username or "").strip() + if not username: + raise AccountNotFoundError("invalid username") + + now = int(time.time()) + if not self._repository.update_mfa(username, enabled=enabled, reset_secret=reset_secret, timestamp=now): + raise AccountNotFoundError("user not found") + + def fetch_account(self, username: str) -> Optional[OperatorAccountRecord]: + account = self._repository.fetch_by_username(username) + return _to_record(account) if account else None + + def _normalize_role(self, role: str) -> str: + normalized = (role or "").strip().title() or "User" + if normalized not in {"User", "Admin"}: + raise InvalidRoleError("invalid role") + return normalized + + +def _to_record(account: OperatorAccount) -> OperatorAccountRecord: + return OperatorAccountRecord( + username=account.username, + display_name=account.display_name or account.username, + role=account.role or "User", + last_login=int(account.last_login or 0), + created_at=int(account.created_at or 0), + updated_at=int(account.updated_at or 0), + mfa_enabled=bool(account.mfa_enabled), + ) + + +__all__ = [ + "OperatorAccountService", + "OperatorAccountError", + "UsernameAlreadyExistsError", + "AccountNotFoundError", + "LastAdminError", + "LastUserError", + "CannotModifySelfError", + "InvalidRoleError", + "InvalidPasswordHashError", + "OperatorAccountRecord", +] diff --git a/Data/Engine/services/container.py b/Data/Engine/services/container.py index 714b686..621e02a 100644 --- a/Data/Engine/services/container.py +++ b/Data/Engine/services/container.py @@ -22,6 +22,7 @@ from Data.Engine.repositories.sqlite import ( from Data.Engine.services.auth import ( DeviceAuthService, DPoPValidator, + OperatorAccountService, OperatorAuthService, JWTService, TokenService, @@ -49,6 +50,7 @@ class EngineServiceContainer: scheduler_service: SchedulerService github_service: GitHubService operator_auth_service: OperatorAuthService + operator_account_service: OperatorAccountService def build_service_container( @@ -114,6 +116,10 @@ def build_service_container( repository=user_repo, logger=log.getChild("operator_auth"), ) + operator_account_service = OperatorAccountService( + repository=user_repo, + logger=log.getChild("operator_accounts"), + ) github_provider = GitHubArtifactProvider( cache_file=settings.github.cache_file, @@ -139,6 +145,7 @@ def build_service_container( scheduler_service=scheduler_service, github_service=github_service, operator_auth_service=operator_auth_service, + operator_account_service=operator_account_service, ) diff --git a/Data/Engine/tests/test_http_auth.py b/Data/Engine/tests/test_http_auth.py index b5fc39c..2f811c6 100644 --- a/Data/Engine/tests/test_http_auth.py +++ b/Data/Engine/tests/test_http_auth.py @@ -4,6 +4,7 @@ from pathlib import Path import pytest pytest.importorskip("flask") +pytest.importorskip("jwt") from Data.Engine.config.environment import ( DatabaseSettings, diff --git a/Data/Engine/tests/test_http_users.py b/Data/Engine/tests/test_http_users.py new file mode 100644 index 0000000..e30fadb --- /dev/null +++ b/Data/Engine/tests/test_http_users.py @@ -0,0 +1,120 @@ +"""HTTP integration tests for operator account endpoints.""" + +from __future__ import annotations + +import hashlib + +from .test_http_auth import _login, prepared_app + + +def test_list_users_requires_authentication(prepared_app): + client = prepared_app.test_client() + resp = client.get("/api/users") + assert resp.status_code == 401 + + +def test_list_users_returns_accounts(prepared_app): + client = prepared_app.test_client() + _login(client) + + resp = client.get("/api/users") + assert resp.status_code == 200 + payload = resp.get_json() + assert isinstance(payload, dict) + assert "users" in payload + assert any(user["username"] == "admin" for user in payload["users"]) + + +def test_create_user_validates_payload(prepared_app): + client = prepared_app.test_client() + _login(client) + + resp = client.post("/api/users", json={"username": "bob"}) + assert resp.status_code == 400 + + payload = { + "username": "bob", + "password_sha512": hashlib.sha512(b"pw").hexdigest(), + "role": "User", + } + resp = client.post("/api/users", json=payload) + assert resp.status_code == 200 + + # Duplicate username should conflict + resp = client.post("/api/users", json=payload) + assert resp.status_code == 409 + + +def test_delete_user_handles_edge_cases(prepared_app): + client = prepared_app.test_client() + _login(client) + + # cannot delete the only user + resp = client.delete("/api/users/admin") + assert resp.status_code == 400 + + # create another user then delete them successfully + payload = { + "username": "alice", + "password_sha512": hashlib.sha512(b"pw").hexdigest(), + "role": "User", + } + client.post("/api/users", json=payload) + + resp = client.delete("/api/users/alice") + assert resp.status_code == 200 + + +def test_delete_user_prevents_self_deletion(prepared_app): + client = prepared_app.test_client() + _login(client) + + payload = { + "username": "charlie", + "password_sha512": hashlib.sha512(b"pw").hexdigest(), + "role": "User", + } + client.post("/api/users", json=payload) + + resp = client.delete("/api/users/admin") + assert resp.status_code == 400 + + +def test_change_role_updates_session(prepared_app): + client = prepared_app.test_client() + _login(client) + + payload = { + "username": "backup", + "password_sha512": hashlib.sha512(b"pw").hexdigest(), + "role": "Admin", + } + client.post("/api/users", json=payload) + + resp = client.post("/api/users/backup/role", json={"role": "User"}) + assert resp.status_code == 200 + + resp = client.post("/api/users/admin/role", json={"role": "User"}) + assert resp.status_code == 400 + + +def test_reset_password_requires_valid_hash(prepared_app): + client = prepared_app.test_client() + _login(client) + + resp = client.post("/api/users/admin/reset_password", json={"password_sha512": "abc"}) + assert resp.status_code == 400 + + resp = client.post( + "/api/users/admin/reset_password", + json={"password_sha512": hashlib.sha512(b"new").hexdigest()}, + ) + assert resp.status_code == 200 + + +def test_update_mfa_returns_not_found_for_unknown_user(prepared_app): + client = prepared_app.test_client() + _login(client) + + resp = client.post("/api/users/missing/mfa", json={"enabled": True}) + assert resp.status_code == 404 diff --git a/Data/Engine/tests/test_operator_account_service.py b/Data/Engine/tests/test_operator_account_service.py new file mode 100644 index 0000000..1f0118a --- /dev/null +++ b/Data/Engine/tests/test_operator_account_service.py @@ -0,0 +1,191 @@ +"""Tests for the operator account management service.""" + +from __future__ import annotations + +import hashlib +import sqlite3 +from pathlib import Path +from typing import Callable + +import pytest + +pytest.importorskip("jwt") + +from Data.Engine.repositories.sqlite.connection import connection_factory +from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository +from Data.Engine.services.auth.operator_account_service import ( + AccountNotFoundError, + CannotModifySelfError, + InvalidPasswordHashError, + InvalidRoleError, + LastAdminError, + LastUserError, + OperatorAccountService, + UsernameAlreadyExistsError, +) + + +def _prepare_db(path: Path) -> Callable[[], sqlite3.Connection]: + conn = sqlite3.connect(path) + conn.execute( + """ + CREATE TABLE users ( + id TEXT PRIMARY KEY, + username TEXT UNIQUE, + display_name TEXT, + password_sha512 TEXT, + role TEXT, + last_login INTEGER, + created_at INTEGER, + updated_at INTEGER, + mfa_enabled INTEGER, + mfa_secret TEXT + ) + """ + ) + conn.commit() + conn.close() + return connection_factory(path) + + +def _insert_user( + factory: Callable[[], sqlite3.Connection], + *, + user_id: str, + username: str, + password_hash: str, + role: str = "Admin", + mfa_enabled: int = 0, + mfa_secret: str = "", +) -> None: + conn = factory() + conn.execute( + """ + INSERT INTO users ( + id, username, display_name, password_sha512, role, + last_login, created_at, updated_at, mfa_enabled, mfa_secret + ) VALUES (?, ?, ?, ?, ?, 0, 0, 0, ?, ?) + """, + (user_id, username, username, password_hash, role, mfa_enabled, mfa_secret), + ) + conn.commit() + conn.close() + + +def _service(factory: Callable[[], sqlite3.Connection]) -> OperatorAccountService: + repo = SQLiteUserRepository(factory) + return OperatorAccountService(repo) + + +def test_list_accounts_returns_users(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"password").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + + service = _service(factory) + records = service.list_accounts() + + assert len(records) == 1 + assert records[0].username == "admin" + assert records[0].role == "Admin" + + +def test_create_account_enforces_uniqueness(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + service = _service(factory) + password_hash = hashlib.sha512(b"pw").hexdigest() + + service.create_account(username="admin", password_sha512=password_hash, role="Admin") + + with pytest.raises(UsernameAlreadyExistsError): + service.create_account(username="admin", password_sha512=password_hash, role="Admin") + + +def test_create_account_validates_password_hash(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + service = _service(factory) + + with pytest.raises(InvalidPasswordHashError): + service.create_account(username="user", password_sha512="abc", role="User") + + +def test_delete_account_protects_last_user(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"pw").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + + service = _service(factory) + + with pytest.raises(LastUserError): + service.delete_account("admin") + + +def test_delete_account_prevents_self_deletion(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"pw").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + _insert_user(factory, user_id="2", username="user", password_hash=password_hash, role="User") + + service = _service(factory) + + with pytest.raises(CannotModifySelfError): + service.delete_account("admin", actor="admin") + + +def test_delete_account_prevents_last_admin_removal(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"pw").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + _insert_user(factory, user_id="2", username="user", password_hash=password_hash, role="User") + + service = _service(factory) + + with pytest.raises(LastAdminError): + service.delete_account("admin") + + +def test_change_role_demotes_only_when_valid(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"pw").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + _insert_user(factory, user_id="2", username="backup", password_hash=password_hash) + + service = _service(factory) + service.change_role("backup", "User") + + with pytest.raises(LastAdminError): + service.change_role("admin", "User") + + with pytest.raises(InvalidRoleError): + service.change_role("admin", "invalid") + + +def test_reset_password_validates_hash(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + password_hash = hashlib.sha512(b"pw").hexdigest() + _insert_user(factory, user_id="1", username="admin", password_hash=password_hash) + + service = _service(factory) + + with pytest.raises(InvalidPasswordHashError): + service.reset_password("admin", "abc") + + new_hash = hashlib.sha512(b"new").hexdigest() + service.reset_password("admin", new_hash) + + +def test_update_mfa_raises_for_unknown_user(tmp_path): + db = tmp_path / "users.db" + factory = _prepare_db(db) + service = _service(factory) + + with pytest.raises(AccountNotFoundError): + service.update_mfa("missing", enabled=True, reset_secret=False)