Removed Experimental Engine

This commit is contained in:
2025-10-25 23:37:02 -06:00
parent 05b72f17a6
commit e16746d407
117 changed files with 0 additions and 16887 deletions

View File

@@ -1 +0,0 @@
"""Test suite for the Borealis Engine."""

View File

@@ -1,72 +0,0 @@
"""Shared pytest fixtures for Engine HTTP interface tests."""
from __future__ import annotations
from pathlib import Path
import pytest
from Data.Engine.config.environment import (
DatabaseSettings,
EngineSettings,
FlaskSettings,
GitHubSettings,
ServerSettings,
SocketIOSettings,
)
from Data.Engine.interfaces.http import register_http_interfaces
from Data.Engine.repositories.sqlite import connection as sqlite_connection
from Data.Engine.repositories.sqlite import migrations as sqlite_migrations
from Data.Engine.server import create_app
from Data.Engine.services.container import build_service_container
@pytest.fixture()
def engine_settings(tmp_path: Path) -> EngineSettings:
"""Provision an EngineSettings instance backed by a temporary project root."""
project_root = tmp_path
static_root = project_root / "static"
static_root.mkdir()
(static_root / "index.html").write_text("<html></html>", encoding="utf-8")
database_path = project_root / "database.db"
return EngineSettings(
project_root=project_root,
debug=False,
database=DatabaseSettings(path=database_path, apply_migrations=False),
flask=FlaskSettings(
secret_key="test-key",
static_root=static_root,
cors_allowed_origins=("https://localhost",),
),
socketio=SocketIOSettings(cors_allowed_origins=("https://localhost",)),
server=ServerSettings(host="127.0.0.1", port=5000),
github=GitHubSettings(
default_repo="owner/repo",
default_branch="main",
refresh_interval_seconds=60,
cache_root=project_root / "cache",
),
)
@pytest.fixture()
def prepared_app(engine_settings: EngineSettings):
"""Create a Flask app instance with registered Engine interfaces."""
settings = engine_settings
settings.github.cache_root.mkdir(exist_ok=True, parents=True)
db_factory = sqlite_connection.connection_factory(settings.database.path)
with sqlite_connection.connection_scope(settings.database.path) as conn:
sqlite_migrations.apply_all(conn)
app = create_app(settings, db_factory=db_factory)
services = build_service_container(settings, db_factory=db_factory)
app.extensions["engine_services"] = services
register_http_interfaces(app, services)
app.config.update(TESTING=True)
return app

View File

@@ -1,74 +0,0 @@
import unittest
from Data.Engine.builders.device_auth import (
DeviceAuthRequestBuilder,
RefreshTokenRequestBuilder,
)
from Data.Engine.domain.device_auth import DeviceAuthErrorCode, DeviceAuthFailure
class DeviceAuthRequestBuilderTests(unittest.TestCase):
def test_build_successful_request(self) -> None:
request = (
DeviceAuthRequestBuilder()
.with_authorization("Bearer abc123")
.with_http_method("post")
.with_htu("https://example.test/api")
.with_service_context("currentUser")
.with_dpop_proof("proof")
.build()
)
self.assertEqual(request.access_token, "abc123")
self.assertEqual(request.http_method, "POST")
self.assertEqual(request.htu, "https://example.test/api")
self.assertEqual(request.service_context, "CURRENTUSER")
self.assertEqual(request.dpop_proof, "proof")
def test_missing_authorization_raises_failure(self) -> None:
builder = (
DeviceAuthRequestBuilder()
.with_http_method("GET")
.with_htu("/health")
)
with self.assertRaises(DeviceAuthFailure) as ctx:
builder.build()
self.assertEqual(ctx.exception.code, DeviceAuthErrorCode.MISSING_AUTHORIZATION)
class RefreshTokenRequestBuilderTests(unittest.TestCase):
def test_refresh_request_requires_all_fields(self) -> None:
request = (
RefreshTokenRequestBuilder()
.with_payload({"guid": "de305d54-75b4-431b-adb2-eb6b9e546014", "refresh_token": "tok"})
.with_http_method("post")
.with_htu("https://example.test/api")
.with_dpop_proof("proof")
.build()
)
self.assertEqual(request.guid.value, "DE305D54-75B4-431B-ADB2-EB6B9E546014")
self.assertEqual(request.refresh_token, "tok")
self.assertEqual(request.http_method, "POST")
self.assertEqual(request.htu, "https://example.test/api")
self.assertEqual(request.dpop_proof, "proof")
def test_refresh_request_missing_guid_raises_failure(self) -> None:
builder = (
RefreshTokenRequestBuilder()
.with_payload({"refresh_token": "tok"})
.with_http_method("POST")
.with_htu("https://example.test/api")
)
with self.assertRaises(DeviceAuthFailure) as ctx:
builder.build()
self.assertEqual(ctx.exception.code, DeviceAuthErrorCode.INVALID_CLAIMS)
self.assertIn("missing guid", ctx.exception.detail)
if __name__ == "__main__": # pragma: no cover - convenience for local runs
unittest.main()

View File

@@ -1,106 +0,0 @@
"""Tests for environment configuration helpers."""
from __future__ import annotations
from pathlib import Path
from Data.Engine.config.environment import load_environment
def test_static_root_prefers_engine_runtime(tmp_path, monkeypatch):
"""Engine static root should prefer the staged web-interface build."""
engine_build = tmp_path / "Engine" / "web-interface" / "build"
engine_build.mkdir(parents=True)
(engine_build / "index.html").write_text("<html></html>", encoding="utf-8")
# Ensure other fallbacks exist but should not be selected while the Engine
# runtime assets are present.
legacy_build = tmp_path / "Data" / "Server" / "WebUI" / "build"
legacy_build.mkdir(parents=True)
(legacy_build / "index.html").write_text("legacy", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
settings = load_environment()
assert settings.flask.static_root == engine_build.resolve()
def test_static_root_env_override(tmp_path, monkeypatch):
"""Explicit overrides should win over filesystem detection."""
override = tmp_path / "custom" / "build"
override.mkdir(parents=True)
(override / "index.html").write_text("override", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.setenv("BOREALIS_STATIC_ROOT", str(override))
settings = load_environment()
assert settings.flask.static_root == override.resolve()
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_static_root_falls_back_to_engine_source(tmp_path, monkeypatch):
"""Engine data assets should serve when no build output exists."""
engine_source = tmp_path / "Data" / "Engine" / "web-interface"
engine_source.mkdir(parents=True)
(engine_source / "index.html").write_text("<html></html>", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
settings = load_environment()
assert settings.flask.static_root == engine_source.resolve()
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_static_root_considers_runtime_copy(tmp_path, monkeypatch):
"""Runtime Server/web-interface copies should be considered when Data assets are missing."""
runtime_source = tmp_path / "Server" / "web-interface"
runtime_source.mkdir(parents=True)
(runtime_source / "index.html").write_text("runtime", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
settings = load_environment()
assert settings.flask.static_root == runtime_source.resolve()
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_static_root_falls_back_to_legacy_assets(tmp_path, monkeypatch):
"""Legacy Data/Server/WebUI assets remain a valid fallback."""
legacy_source = tmp_path / "Data" / "Server" / "WebUI"
legacy_source.mkdir(parents=True)
(legacy_source / "index.html").write_text("legacy", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
settings = load_environment()
assert settings.flask.static_root == legacy_source.resolve()
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_resolve_project_root_defaults_to_repository(monkeypatch):
"""The project root should resolve to the repository checkout."""
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
from Data.Engine.config import environment as env_module
expected = Path(env_module.__file__).resolve().parents[3]
assert env_module._resolve_project_root() == expected

View File

@@ -1,65 +0,0 @@
from __future__ import annotations
import importlib
import os
import shutil
import ssl
import sys
import tempfile
import unittest
from pathlib import Path
from Data.Engine import runtime
class CertificateGenerationTests(unittest.TestCase):
def setUp(self) -> None:
self._tmpdir = Path(tempfile.mkdtemp(prefix="engine-cert-tests-"))
self.addCleanup(lambda: shutil.rmtree(self._tmpdir, ignore_errors=True))
self._previous_env: dict[str, str | None] = {}
for name in ("BOREALIS_CERTIFICATES_ROOT", "BOREALIS_SERVER_CERT_ROOT"):
self._previous_env[name] = os.environ.get(name)
os.environ[name] = str(self._tmpdir / name.lower())
runtime.certificates_root.cache_clear()
runtime.server_certificates_root.cache_clear()
module_name = "Data.Engine.services.crypto.certificates"
if module_name in sys.modules:
del sys.modules[module_name]
try:
self.certificates = importlib.import_module(module_name)
except ModuleNotFoundError as exc: # pragma: no cover - optional deps absent
self.skipTest(f"cryptography dependency unavailable: {exc}")
def tearDown(self) -> None: # pragma: no cover - environment cleanup
for name, value in self._previous_env.items():
if value is None:
os.environ.pop(name, None)
else:
os.environ[name] = value
runtime.certificates_root.cache_clear()
runtime.server_certificates_root.cache_clear()
def test_ensure_certificate_creates_material(self) -> None:
cert_path, key_path, bundle_path = self.certificates.ensure_certificate()
self.assertTrue(cert_path.exists(), "certificate was not generated")
self.assertTrue(key_path.exists(), "private key was not generated")
self.assertTrue(bundle_path.exists(), "bundle was not generated")
context = self.certificates.build_ssl_context()
self.assertIsInstance(context, ssl.SSLContext)
self.assertEqual(context.minimum_version, ssl.TLSVersion.TLSv1_3)
def test_certificate_paths_returns_strings(self) -> None:
cert_path, key_path, bundle_path = self.certificates.certificate_paths()
self.assertIsInstance(cert_path, str)
self.assertIsInstance(key_path, str)
self.assertIsInstance(bundle_path, str)
if __name__ == "__main__": # pragma: no cover - convenience
unittest.main()

View File

@@ -1,59 +0,0 @@
import unittest
from Data.Engine.domain.device_auth import (
DeviceAuthErrorCode,
DeviceAuthFailure,
DeviceFingerprint,
DeviceGuid,
sanitize_service_context,
)
class DeviceGuidTests(unittest.TestCase):
def test_guid_normalization_accepts_braces_and_lowercase(self) -> None:
guid = DeviceGuid("{de305d54-75b4-431b-adb2-eb6b9e546014}")
self.assertEqual(guid.value, "DE305D54-75B4-431B-ADB2-EB6B9E546014")
def test_guid_rejects_empty_string(self) -> None:
with self.assertRaises(ValueError):
DeviceGuid("")
class DeviceFingerprintTests(unittest.TestCase):
def test_fingerprint_normalization_trims_and_lowercases(self) -> None:
fingerprint = DeviceFingerprint(" AA:BB:CC ")
self.assertEqual(fingerprint.value, "aa:bb:cc")
def test_fingerprint_rejects_blank_input(self) -> None:
with self.assertRaises(ValueError):
DeviceFingerprint(" ")
class ServiceContextTests(unittest.TestCase):
def test_sanitize_service_context_returns_uppercase_only(self) -> None:
self.assertEqual(sanitize_service_context("system"), "SYSTEM")
def test_sanitize_service_context_filters_invalid_chars(self) -> None:
self.assertEqual(sanitize_service_context("sys tem!"), "SYSTEM")
def test_sanitize_service_context_returns_none_for_empty_result(self) -> None:
self.assertIsNone(sanitize_service_context("@@@"))
class DeviceAuthFailureTests(unittest.TestCase):
def test_to_dict_includes_retry_after_and_detail(self) -> None:
failure = DeviceAuthFailure(
DeviceAuthErrorCode.RATE_LIMITED,
http_status=429,
retry_after=30,
detail="too many attempts",
)
payload = failure.to_dict()
self.assertEqual(
payload,
{"error": "rate_limited", "retry_after": 30.0, "detail": "too many attempts"},
)
if __name__ == "__main__": # pragma: no cover - convenience for local runs
unittest.main()

View File

@@ -1,122 +0,0 @@
import base64
import sqlite3
from datetime import datetime, timezone
import pytest
from Data.Engine.repositories.sqlite import connection as sqlite_connection
from Data.Engine.repositories.sqlite import migrations as sqlite_migrations
from Data.Engine.repositories.sqlite.enrollment_repository import SQLiteEnrollmentRepository
from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository
from Data.Engine.services.enrollment.admin_service import EnrollmentAdminService
def _build_service(tmp_path):
db_path = tmp_path / "admin.db"
conn = sqlite3.connect(db_path)
sqlite_migrations.apply_all(conn)
conn.close()
factory = sqlite_connection.connection_factory(db_path)
enrollment_repo = SQLiteEnrollmentRepository(factory)
user_repo = SQLiteUserRepository(factory)
fixed_now = datetime(2024, 1, 1, tzinfo=timezone.utc)
service = EnrollmentAdminService(
repository=enrollment_repo,
user_repository=user_repo,
clock=lambda: fixed_now,
)
return service, factory, fixed_now
def test_create_and_list_install_codes(tmp_path):
service, factory, fixed_now = _build_service(tmp_path)
record = service.create_install_code(ttl_hours=3, max_uses=5, created_by="admin")
assert record.code
assert record.max_uses == 5
assert record.status(now=fixed_now) == "active"
records = service.list_install_codes()
assert any(r.record_id == record.record_id for r in records)
# Invalid TTL should raise
with pytest.raises(ValueError):
service.create_install_code(ttl_hours=2, max_uses=1, created_by=None)
# Deleting should succeed and remove the record
assert service.delete_install_code(record.record_id) is True
remaining = service.list_install_codes()
assert all(r.record_id != record.record_id for r in remaining)
def test_list_device_approvals_includes_conflict(tmp_path):
service, factory, fixed_now = _build_service(tmp_path)
conn = factory()
cur = conn.cursor()
cur.execute(
"INSERT INTO sites (name, description, created_at) VALUES (?, ?, ?)",
("HQ", "Primary site", int(fixed_now.timestamp())),
)
site_id = cur.lastrowid
cur.execute(
"""
INSERT INTO devices (guid, hostname, created_at, last_seen, ssl_key_fingerprint, status)
VALUES (?, ?, ?, ?, ?, 'active')
""",
("11111111-1111-1111-1111-111111111111", "agent-one", int(fixed_now.timestamp()), int(fixed_now.timestamp()), "abc123",),
)
cur.execute(
"INSERT INTO device_sites (device_hostname, site_id, assigned_at) VALUES (?, ?, ?)",
("agent-one", site_id, int(fixed_now.timestamp())),
)
now_iso = fixed_now.isoformat()
cur.execute(
"""
INSERT INTO device_approvals (
id,
approval_reference,
guid,
hostname_claimed,
ssl_key_fingerprint_claimed,
enrollment_code_id,
status,
client_nonce,
server_nonce,
created_at,
updated_at,
approved_by_user_id,
agent_pubkey_der
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"approval-1",
"REF123",
None,
"agent-one",
"abc123",
"code-1",
"pending",
base64.b64encode(b"client").decode(),
base64.b64encode(b"server").decode(),
now_iso,
now_iso,
None,
b"pubkey",
),
)
conn.commit()
conn.close()
approvals = service.list_device_approvals()
assert len(approvals) == 1
record = approvals[0]
assert record.hostname_conflict is not None
assert record.hostname_conflict.fingerprint_match is True
assert record.conflict_requires_prompt is False

View File

@@ -1,353 +0,0 @@
import base64
import sqlite3
from datetime import datetime, timezone
from .test_http_auth import _login
def test_enrollment_codes_require_authentication(prepared_app):
client = prepared_app.test_client()
resp = client.get("/api/admin/enrollment-codes")
assert resp.status_code == 401
def test_enrollment_code_workflow(prepared_app):
client = prepared_app.test_client()
_login(client)
payload = {"ttl_hours": 3, "max_uses": 4}
resp = client.post("/api/admin/enrollment-codes", json=payload)
assert resp.status_code == 201
created = resp.get_json()
assert created["max_uses"] == 4
assert created["status"] == "active"
resp = client.get("/api/admin/enrollment-codes")
assert resp.status_code == 200
codes = resp.get_json().get("codes", [])
assert any(code["id"] == created["id"] for code in codes)
resp = client.delete(f"/api/admin/enrollment-codes/{created['id']}")
assert resp.status_code == 200
def test_device_approvals_listing(prepared_app, engine_settings):
client = prepared_app.test_client()
_login(client)
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
now = datetime.now(tz=timezone.utc)
cur.execute(
"INSERT INTO sites (name, description, created_at) VALUES (?, ?, ?)",
("HQ", "Primary", int(now.timestamp())),
)
site_id = cur.lastrowid
cur.execute(
"""
INSERT INTO devices (guid, hostname, created_at, last_seen, ssl_key_fingerprint, status)
VALUES (?, ?, ?, ?, ?, 'active')
""",
(
"22222222-2222-2222-2222-222222222222",
"approval-host",
int(now.timestamp()),
int(now.timestamp()),
"deadbeef",
),
)
cur.execute(
"INSERT INTO device_sites (device_hostname, site_id, assigned_at) VALUES (?, ?, ?)",
("approval-host", site_id, int(now.timestamp())),
)
now_iso = now.isoformat()
cur.execute(
"""
INSERT INTO device_approvals (
id,
approval_reference,
guid,
hostname_claimed,
ssl_key_fingerprint_claimed,
enrollment_code_id,
status,
client_nonce,
server_nonce,
created_at,
updated_at,
approved_by_user_id,
agent_pubkey_der
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"approval-http",
"REFHTTP",
None,
"approval-host",
"deadbeef",
"code-http",
"pending",
base64.b64encode(b"client").decode(),
base64.b64encode(b"server").decode(),
now_iso,
now_iso,
None,
b"pub",
),
)
conn.commit()
conn.close()
resp = client.get("/api/admin/device-approvals")
assert resp.status_code == 200
body = resp.get_json()
approvals = body.get("approvals", [])
assert any(a["id"] == "approval-http" for a in approvals)
record = next(a for a in approvals if a["id"] == "approval-http")
assert record.get("hostname_conflict", {}).get("fingerprint_match") is True
def test_device_approval_requires_resolution(prepared_app, engine_settings):
client = prepared_app.test_client()
_login(client)
now = datetime.now(tz=timezone.utc)
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO devices (
guid,
hostname,
created_at,
last_seen,
ssl_key_fingerprint,
status
) VALUES (?, ?, ?, ?, ?, 'active')
""",
(
"33333333-3333-3333-3333-333333333333",
"conflict-host",
int(now.timestamp()),
int(now.timestamp()),
"existingfp",
),
)
now_iso = now.isoformat()
cur.execute(
"""
INSERT INTO device_approvals (
id,
approval_reference,
guid,
hostname_claimed,
ssl_key_fingerprint_claimed,
enrollment_code_id,
status,
client_nonce,
server_nonce,
created_at,
updated_at,
approved_by_user_id,
agent_pubkey_der
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"approval-conflict",
"REF-CONFLICT",
None,
"conflict-host",
"newfinger",
"code-conflict",
"pending",
base64.b64encode(b"client").decode(),
base64.b64encode(b"server").decode(),
now_iso,
now_iso,
None,
b"pub",
),
)
conn.commit()
conn.close()
resp = client.post("/api/admin/device-approvals/approval-conflict/approve", json={})
assert resp.status_code == 409
assert resp.get_json().get("error") == "conflict_resolution_required"
resp = client.post(
"/api/admin/device-approvals/approval-conflict/approve",
json={"conflict_resolution": "overwrite"},
)
assert resp.status_code == 200
body = resp.get_json()
assert body == {"status": "approved", "conflict_resolution": "overwrite"}
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"SELECT status, guid, approved_by_user_id FROM device_approvals WHERE id = ?",
("approval-conflict",),
)
row = cur.fetchone()
conn.close()
assert row[0] == "approved"
assert row[1] == "33333333-3333-3333-3333-333333333333"
assert row[2]
resp = client.post(
"/api/admin/device-approvals/approval-conflict/approve",
json={"conflict_resolution": "overwrite"},
)
assert resp.status_code == 409
assert resp.get_json().get("error") == "approval_not_pending"
def test_device_approval_auto_merge(prepared_app, engine_settings):
client = prepared_app.test_client()
_login(client)
now = datetime.now(tz=timezone.utc)
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO devices (
guid,
hostname,
created_at,
last_seen,
ssl_key_fingerprint,
status
) VALUES (?, ?, ?, ?, ?, 'active')
""",
(
"44444444-4444-4444-4444-444444444444",
"merge-host",
int(now.timestamp()),
int(now.timestamp()),
"deadbeef",
),
)
now_iso = now.isoformat()
cur.execute(
"""
INSERT INTO device_approvals (
id,
approval_reference,
guid,
hostname_claimed,
ssl_key_fingerprint_claimed,
enrollment_code_id,
status,
client_nonce,
server_nonce,
created_at,
updated_at,
approved_by_user_id,
agent_pubkey_der
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"approval-merge",
"REF-MERGE",
None,
"merge-host",
"deadbeef",
"code-merge",
"pending",
base64.b64encode(b"client").decode(),
base64.b64encode(b"server").decode(),
now_iso,
now_iso,
None,
b"pub",
),
)
conn.commit()
conn.close()
resp = client.post("/api/admin/device-approvals/approval-merge/approve", json={})
assert resp.status_code == 200
body = resp.get_json()
assert body.get("status") == "approved"
assert body.get("conflict_resolution") == "auto_merge_fingerprint"
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"SELECT guid, status FROM device_approvals WHERE id = ?",
("approval-merge",),
)
row = cur.fetchone()
conn.close()
assert row[1] == "approved"
assert row[0] == "44444444-4444-4444-4444-444444444444"
def test_device_approval_deny(prepared_app, engine_settings):
client = prepared_app.test_client()
_login(client)
now = datetime.now(tz=timezone.utc)
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
now_iso = now.isoformat()
cur.execute(
"""
INSERT INTO device_approvals (
id,
approval_reference,
guid,
hostname_claimed,
ssl_key_fingerprint_claimed,
enrollment_code_id,
status,
client_nonce,
server_nonce,
created_at,
updated_at,
approved_by_user_id,
agent_pubkey_der
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"approval-deny",
"REF-DENY",
None,
"deny-host",
"cafebabe",
"code-deny",
"pending",
base64.b64encode(b"client").decode(),
base64.b64encode(b"server").decode(),
now_iso,
now_iso,
None,
b"pub",
),
)
conn.commit()
conn.close()
resp = client.post("/api/admin/device-approvals/approval-deny/deny", json={})
assert resp.status_code == 200
assert resp.get_json() == {"status": "denied"}
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"SELECT status FROM device_approvals WHERE id = ?",
("approval-deny",),
)
row = cur.fetchone()
conn.close()
assert row[0] == "denied"

View File

@@ -1,326 +0,0 @@
import pytest
pytest.importorskip("jwt")
import json
import sqlite3
import time
from datetime import datetime, timezone
from pathlib import Path
from Data.Engine.domain.device_auth import (
AccessTokenClaims,
DeviceAuthContext,
DeviceFingerprint,
DeviceGuid,
DeviceIdentity,
DeviceStatus,
)
def _insert_device(app, guid: str, fingerprint: str, hostname: str) -> None:
db_path = Path(app.config["ENGINE_DATABASE_PATH"])
now = int(time.time())
with sqlite3.connect(db_path) as conn:
conn.execute(
"""
INSERT INTO devices (
guid,
hostname,
created_at,
last_seen,
ssl_key_fingerprint,
token_version,
status,
key_added_at
) VALUES (?, ?, ?, ?, ?, ?, 'active', ?)
""",
(
guid,
hostname,
now,
now,
fingerprint.lower(),
1,
datetime.now(timezone.utc).isoformat(),
),
)
conn.commit()
def _build_context(guid: str, fingerprint: str, *, status: DeviceStatus = DeviceStatus.ACTIVE) -> DeviceAuthContext:
now = int(time.time())
claims = AccessTokenClaims(
subject="device",
guid=DeviceGuid(guid),
fingerprint=DeviceFingerprint(fingerprint),
token_version=1,
issued_at=now,
not_before=now,
expires_at=now + 600,
raw={"sub": "device"},
)
identity = DeviceIdentity(DeviceGuid(guid), DeviceFingerprint(fingerprint))
return DeviceAuthContext(
identity=identity,
access_token="token",
claims=claims,
status=status,
service_context="SYSTEM",
)
def test_heartbeat_updates_device(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "DE305D54-75B4-431B-ADB2-EB6B9E546014"
fingerprint = "aa:bb:cc"
hostname = "device-heartbeat"
_insert_device(prepared_app, guid, fingerprint, hostname)
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
payload = {
"hostname": hostname,
"inventory": {"memory": [{"total": "16GB"}], "cpu": {"cores": 8}},
"metrics": {"operating_system": "Windows", "last_user": "Admin", "uptime": 120},
"external_ip": "1.2.3.4",
}
start = int(time.time())
resp = client.post(
"/api/agent/heartbeat",
json=payload,
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
body = resp.get_json()
assert body == {"status": "ok", "poll_after_ms": 15000}
db_path = Path(prepared_app.config["ENGINE_DATABASE_PATH"])
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT last_seen, external_ip, memory, cpu FROM devices WHERE guid = ?",
(guid,),
).fetchone()
assert row is not None
last_seen, external_ip, memory_json, cpu_json = row
assert last_seen >= start
assert external_ip == "1.2.3.4"
assert json.loads(memory_json)[0]["total"] == "16GB"
assert json.loads(cpu_json)["cores"] == 8
def test_heartbeat_returns_404_when_device_missing(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "9E295C27-8339-40C8-AD1A-6ED95C164A4A"
fingerprint = "11:22:33"
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
resp = client.post(
"/api/agent/heartbeat",
json={"hostname": "missing-device"},
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 404
assert resp.get_json() == {"error": "device_not_registered"}
def test_script_request_reports_status_and_signing_key(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "2F8D76C0-38D4-4700-B247-3E90C03A67D7"
fingerprint = "44:55:66"
hostname = "device-script"
_insert_device(prepared_app, guid, fingerprint, hostname)
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
class DummySigner:
def public_base64_spki(self) -> str:
return "PUBKEY"
object.__setattr__(services, "script_signer", DummySigner())
resp = client.post(
"/api/agent/script/request",
json={"guid": guid},
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
body = resp.get_json()
assert body == {
"status": "idle",
"poll_after_ms": 30000,
"sig_alg": "ed25519",
"signing_key": "PUBKEY",
}
quarantined_context = _build_context(guid, fingerprint, status=DeviceStatus.QUARANTINED)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: quarantined_context)
resp = client.post(
"/api/agent/script/request",
json={},
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
assert resp.get_json()["status"] == "quarantined"
assert resp.get_json()["poll_after_ms"] == 60000
def test_agent_details_persists_inventory(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "5C9D76E4-4C5A-4A5D-9B5D-1C2E3F4A5B6C"
fingerprint = "aa:bb:cc:dd"
hostname = "device-details"
_insert_device(prepared_app, guid, fingerprint, hostname)
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
payload = {
"hostname": hostname,
"agent_id": "AGENT-01",
"agent_hash": "hash-value",
"details": {
"summary": {
"hostname": hostname,
"device_type": "Laptop",
"last_user": "BUNNY-LAB\\nicole.rappe",
"operating_system": "Windows 11",
"description": "Primary workstation",
"last_reboot": "2025-10-01 10:00:00",
"uptime": 3600,
},
"memory": [{"slot": "DIMM0", "capacity": 17179869184}],
"storage": [{"model": "NVMe", "size": 512}],
"network": [{"adapter": "Ethernet", "ips": ["192.168.1.50"]}],
"software": [{"name": "Borealis Agent", "version": "2.0"}],
"cpu": {"name": "Intel Core i7", "logical_cores": 8, "base_clock_ghz": 3.4},
},
}
resp = client.post(
"/api/agent/details",
json=payload,
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
assert resp.get_json() == {"status": "ok"}
db_path = Path(prepared_app.config["ENGINE_DATABASE_PATH"])
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"""
SELECT device_type, last_user, memory, storage, network, description
FROM devices
WHERE guid = ?
""",
(guid,),
).fetchone()
assert row is not None
device_type, last_user, memory_json, storage_json, network_json, description = row
assert device_type == "Laptop"
assert last_user == "BUNNY-LAB\\nicole.rappe"
assert description == "Primary workstation"
assert json.loads(memory_json)[0]["capacity"] == 17179869184
assert json.loads(storage_json)[0]["model"] == "NVMe"
assert json.loads(network_json)[0]["ips"][0] == "192.168.1.50"
resp = client.get("/api/devices")
assert resp.status_code == 200
listing = resp.get_json()
device = next((dev for dev in listing.get("devices", []) if dev["hostname"] == hostname), None)
assert device is not None
summary = device["summary"]
details = device["details"]
assert summary["device_type"] == "Laptop"
assert summary["last_user"] == "BUNNY-LAB\\nicole.rappe"
assert summary["created"]
assert summary.get("uptime_sec") == 3600
assert details["summary"]["device_type"] == "Laptop"
assert details["summary"]["last_reboot"] == "2025-10-01 10:00:00"
assert details["summary"]["created"] == summary["created"]
assert details["software"][0]["name"] == "Borealis Agent"
assert device["storage"][0]["model"] == "NVMe"
assert device["memory"][0]["capacity"] == 17179869184
assert device["cpu"]["name"] == "Intel Core i7"
def test_heartbeat_preserves_last_user_from_details(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "7E8F90A1-B2C3-4D5E-8F90-A1B2C3D4E5F6"
fingerprint = "11:22:33:44"
hostname = "device-preserve"
_insert_device(prepared_app, guid, fingerprint, hostname)
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
client.post(
"/api/agent/details",
json={
"hostname": hostname,
"details": {
"summary": {"hostname": hostname, "last_user": "BUNNY-LAB\\nicole.rappe"}
},
},
headers={"Authorization": "Bearer token"},
)
client.post(
"/api/agent/heartbeat",
json={"hostname": hostname, "metrics": {"uptime": 120}},
headers={"Authorization": "Bearer token"},
)
db_path = Path(prepared_app.config["ENGINE_DATABASE_PATH"])
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT last_user FROM devices WHERE guid = ?",
(guid,),
).fetchone()
assert row is not None
assert row[0] == "BUNNY-LAB\\nicole.rappe"
def test_heartbeat_uses_username_when_last_user_missing(prepared_app, monkeypatch):
client = prepared_app.test_client()
guid = "802A4E5F-1B2C-4D5E-8F90-A1B2C3D4E5F7"
fingerprint = "55:66:77:88"
hostname = "device-username"
_insert_device(prepared_app, guid, fingerprint, hostname)
services = prepared_app.extensions["engine_services"]
context = _build_context(guid, fingerprint)
monkeypatch.setattr(services.device_auth, "authenticate", lambda request, path: context)
resp = client.post(
"/api/agent/heartbeat",
json={"hostname": hostname, "metrics": {"username": "BUNNY-LAB\\alice.smith"}},
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
db_path = Path(prepared_app.config["ENGINE_DATABASE_PATH"])
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT last_user FROM devices WHERE guid = ?",
(guid,),
).fetchone()
assert row is not None
assert row[0] == "BUNNY-LAB\\alice.smith"

View File

@@ -1,86 +0,0 @@
import pytest
pytest.importorskip("flask")
from .test_http_auth import _login
def test_assembly_crud_flow(prepared_app, engine_settings):
client = prepared_app.test_client()
_login(client)
resp = client.post(
"/api/assembly/create",
json={"island": "scripts", "kind": "folder", "path": "Utilities"},
)
assert resp.status_code == 200
resp = client.post(
"/api/assembly/create",
json={
"island": "scripts",
"kind": "file",
"path": "Utilities/sample",
"content": {"name": "Sample", "script": "Write-Output 'Hello'", "type": "powershell"},
},
)
assert resp.status_code == 200
body = resp.get_json()
rel_path = body.get("rel_path")
assert rel_path and rel_path.endswith(".json")
resp = client.get("/api/assembly/list?island=scripts")
assert resp.status_code == 200
listing = resp.get_json()
assert any(item["rel_path"] == rel_path for item in listing.get("items", []))
resp = client.get(f"/api/assembly/load?island=scripts&path={rel_path}")
assert resp.status_code == 200
loaded = resp.get_json()
assert loaded.get("assembly", {}).get("name") == "Sample"
resp = client.post(
"/api/assembly/rename",
json={
"island": "scripts",
"kind": "file",
"path": rel_path,
"new_name": "renamed",
},
)
assert resp.status_code == 200
renamed_rel = resp.get_json().get("rel_path")
assert renamed_rel and renamed_rel.endswith(".json")
resp = client.post(
"/api/assembly/move",
json={
"island": "scripts",
"path": renamed_rel,
"new_path": "Utilities/Nested/renamed.json",
"kind": "file",
},
)
assert resp.status_code == 200
resp = client.post(
"/api/assembly/delete",
json={
"island": "scripts",
"path": "Utilities/Nested/renamed.json",
"kind": "file",
},
)
assert resp.status_code == 200
resp = client.get("/api/assembly/list?island=scripts")
remaining = resp.get_json().get("items", [])
assert all(item["rel_path"] != "Utilities/Nested/renamed.json" for item in remaining)
def test_server_time_endpoint(prepared_app):
client = prepared_app.test_client()
resp = client.get("/api/server/time")
assert resp.status_code == 200
body = resp.get_json()
assert set(["epoch", "iso", "utc_iso", "timezone", "offset_seconds", "display"]).issubset(body)

View File

@@ -1,59 +0,0 @@
import hashlib
import pytest
pytest.importorskip("flask")
pytest.importorskip("jwt")
def _login(client) -> dict:
payload = {
"username": "admin",
"password_sha512": hashlib.sha512("Password".encode()).hexdigest(),
}
resp = client.post("/api/auth/login", json=payload)
assert resp.status_code == 200
data = resp.get_json()
assert isinstance(data, dict)
return data
def test_auth_me_returns_session_user(prepared_app):
client = prepared_app.test_client()
_login(client)
resp = client.get("/api/auth/me")
assert resp.status_code == 200
body = resp.get_json()
assert body == {
"username": "admin",
"display_name": "Administrator",
"role": "Admin",
}
def test_auth_me_uses_token_when_session_missing(prepared_app):
client = prepared_app.test_client()
login_data = _login(client)
token = login_data.get("token")
assert token
# New client without session
other_client = prepared_app.test_client()
other_client.set_cookie("borealis_auth", token)
resp = other_client.get("/api/auth/me")
assert resp.status_code == 200
body = resp.get_json()
assert body == {
"username": "admin",
"display_name": "Administrator",
"role": "Admin",
}
def test_auth_me_requires_authentication(prepared_app):
client = prepared_app.test_client()
resp = client.get("/api/auth/me")
assert resp.status_code == 401
body = resp.get_json()
assert body == {"error": "not_authenticated"}

View File

@@ -1,238 +0,0 @@
import json
from datetime import datetime, timezone
import sqlite3
import time
import pytest
pytest.importorskip("flask")
from .test_http_auth import _login
def _ensure_admin_session(client):
_login(client)
def test_sites_crud_flow(prepared_app):
client = prepared_app.test_client()
_ensure_admin_session(client)
resp = client.get("/api/sites")
assert resp.status_code == 200
assert resp.get_json() == {"sites": []}
create = client.post("/api/sites", json={"name": "HQ", "description": "Primary"})
assert create.status_code == 201
created = create.get_json()
assert created["name"] == "HQ"
listing = client.get("/api/sites")
sites = listing.get_json()["sites"]
assert len(sites) == 1
resp = client.post("/api/sites/assign", json={"site_id": created["id"], "hostnames": ["device-1"]})
assert resp.status_code == 200
mapping = client.get("/api/sites/device_map?hostnames=device-1")
data = mapping.get_json()["mapping"]
assert data["device-1"]["site_id"] == created["id"]
rename = client.post("/api/sites/rename", json={"id": created["id"], "new_name": "Main"})
assert rename.status_code == 200
assert rename.get_json()["name"] == "Main"
delete = client.post("/api/sites/delete", json={"ids": [created["id"]]})
assert delete.status_code == 200
assert delete.get_json()["deleted"] == 1
def test_devices_listing(prepared_app, engine_settings):
client = prepared_app.test_client()
_ensure_admin_session(client)
now = datetime.now(tz=timezone.utc)
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO devices (
guid,
hostname,
description,
created_at,
agent_hash,
last_seen,
connection_type,
connection_endpoint
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"11111111-1111-1111-1111-111111111111",
"test-device",
"Test Device",
int(now.timestamp()),
"hashvalue",
int(now.timestamp()),
"",
"",
),
)
conn.commit()
conn.close()
resp = client.get("/api/devices")
assert resp.status_code == 200
devices = resp.get_json()["devices"]
assert any(device["hostname"] == "test-device" for device in devices)
def test_agent_hash_list_requires_local_request(prepared_app):
client = prepared_app.test_client()
_ensure_admin_session(client)
resp = client.get("/api/agent/hash_list", environ_overrides={"REMOTE_ADDR": "203.0.113.5"})
assert resp.status_code == 403
resp = client.get("/api/agent/hash_list", environ_overrides={"REMOTE_ADDR": "127.0.0.1"})
assert resp.status_code == 200
assert resp.get_json() == {"agents": []}
def test_credentials_list_requires_admin(prepared_app):
client = prepared_app.test_client()
resp = client.get("/api/credentials")
assert resp.status_code == 401
_ensure_admin_session(client)
resp = client.get("/api/credentials")
assert resp.status_code == 200
assert resp.get_json() == {"credentials": []}
def test_device_description_update(prepared_app, engine_settings):
client = prepared_app.test_client()
hostname = "device-desc"
guid = "A3D3F1E5-9B8C-4C6F-80F1-4D5E6F7A8B9C"
now = int(time.time())
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO devices (
guid,
hostname,
description,
created_at,
last_seen
) VALUES (?, ?, '', ?, ?)
""",
(guid, hostname, now, now),
)
conn.commit()
conn.close()
resp = client.post(
f"/api/device/description/{hostname}",
json={"description": "Primary workstation"},
)
assert resp.status_code == 200
assert resp.get_json() == {"status": "ok"}
conn = sqlite3.connect(engine_settings.database.path)
row = conn.execute(
"SELECT description FROM devices WHERE hostname = ?",
(hostname,),
).fetchone()
conn.close()
assert row is not None
assert row[0] == "Primary workstation"
def test_device_details_returns_inventory(prepared_app, engine_settings):
client = prepared_app.test_client()
_ensure_admin_session(client)
hostname = "inventory-1"
guid = "B9F0A1C2-D3E4-5F67-890A-BCDEF1234567"
now = int(time.time())
memory = [{"slot": "DIMM1", "size_gb": 16}]
network = [{"name": "Ethernet", "mac": "AA:BB:CC:DD:EE:FF"}]
software = [{"name": "Agent", "version": "1.0.0"}]
storage = [{"model": "Disk", "size_gb": 512}]
cpu = {"model": "Intel", "cores": 8}
conn = sqlite3.connect(engine_settings.database.path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO devices (
guid,
hostname,
description,
created_at,
agent_hash,
memory,
network,
software,
storage,
cpu,
device_type,
domain,
external_ip,
internal_ip,
last_reboot,
last_seen,
last_user,
operating_system,
uptime,
agent_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
guid,
hostname,
"Workstation",
now,
"hashvalue",
json.dumps(memory),
json.dumps(network),
json.dumps(software),
json.dumps(storage),
json.dumps(cpu),
"Laptop",
"ACME",
"203.0.113.10",
"192.0.2.10",
"2024-01-01 12:00:00",
now,
"ACME\\tech",
"Windows 11",
7200,
"agent-001",
),
)
conn.commit()
conn.close()
resp = client.get(f"/api/device/details/{hostname}")
assert resp.status_code == 200
data = resp.get_json()
assert data["memory"] == memory
assert data["network"] == network
assert data["software"] == software
assert data["storage"] == storage
assert data["cpu"] == cpu
assert data["description"] == "Workstation"
assert data["agent_hash"] == "hashvalue"
assert data["agent_guid"].lower() == guid.lower()
assert data["last_user"] == "ACME\\tech"
assert data["operating_system"] == "Windows 11"
assert data["uptime"] == 7200
assert data["summary"]["hostname"] == hostname
assert data["details"]["memory"] == memory

View File

@@ -1,120 +0,0 @@
"""HTTP integration tests for operator account endpoints."""
from __future__ import annotations
import hashlib
from .test_http_auth import _login
def test_list_users_requires_authentication(prepared_app):
client = prepared_app.test_client()
resp = client.get("/api/users")
assert resp.status_code == 401
def test_list_users_returns_accounts(prepared_app):
client = prepared_app.test_client()
_login(client)
resp = client.get("/api/users")
assert resp.status_code == 200
payload = resp.get_json()
assert isinstance(payload, dict)
assert "users" in payload
assert any(user["username"] == "admin" for user in payload["users"])
def test_create_user_validates_payload(prepared_app):
client = prepared_app.test_client()
_login(client)
resp = client.post("/api/users", json={"username": "bob"})
assert resp.status_code == 400
payload = {
"username": "bob",
"password_sha512": hashlib.sha512(b"pw").hexdigest(),
"role": "User",
}
resp = client.post("/api/users", json=payload)
assert resp.status_code == 200
# Duplicate username should conflict
resp = client.post("/api/users", json=payload)
assert resp.status_code == 409
def test_delete_user_handles_edge_cases(prepared_app):
client = prepared_app.test_client()
_login(client)
# cannot delete the only user
resp = client.delete("/api/users/admin")
assert resp.status_code == 400
# create another user then delete them successfully
payload = {
"username": "alice",
"password_sha512": hashlib.sha512(b"pw").hexdigest(),
"role": "User",
}
client.post("/api/users", json=payload)
resp = client.delete("/api/users/alice")
assert resp.status_code == 200
def test_delete_user_prevents_self_deletion(prepared_app):
client = prepared_app.test_client()
_login(client)
payload = {
"username": "charlie",
"password_sha512": hashlib.sha512(b"pw").hexdigest(),
"role": "User",
}
client.post("/api/users", json=payload)
resp = client.delete("/api/users/admin")
assert resp.status_code == 400
def test_change_role_updates_session(prepared_app):
client = prepared_app.test_client()
_login(client)
payload = {
"username": "backup",
"password_sha512": hashlib.sha512(b"pw").hexdigest(),
"role": "Admin",
}
client.post("/api/users", json=payload)
resp = client.post("/api/users/backup/role", json={"role": "User"})
assert resp.status_code == 200
resp = client.post("/api/users/admin/role", json={"role": "User"})
assert resp.status_code == 400
def test_reset_password_requires_valid_hash(prepared_app):
client = prepared_app.test_client()
_login(client)
resp = client.post("/api/users/admin/reset_password", json={"password_sha512": "abc"})
assert resp.status_code == 400
resp = client.post(
"/api/users/admin/reset_password",
json={"password_sha512": hashlib.sha512(b"new").hexdigest()},
)
assert resp.status_code == 200
def test_update_mfa_returns_not_found_for_unknown_user(prepared_app):
client = prepared_app.test_client()
_login(client)
resp = client.post("/api/users/missing/mfa", json={"enabled": True})
assert resp.status_code == 404

View File

@@ -1,191 +0,0 @@
"""Tests for the operator account management service."""
from __future__ import annotations
import hashlib
import sqlite3
from pathlib import Path
from typing import Callable
import pytest
pytest.importorskip("jwt")
from Data.Engine.repositories.sqlite.connection import connection_factory
from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository
from Data.Engine.services.auth.operator_account_service import (
AccountNotFoundError,
CannotModifySelfError,
InvalidPasswordHashError,
InvalidRoleError,
LastAdminError,
LastUserError,
OperatorAccountService,
UsernameAlreadyExistsError,
)
def _prepare_db(path: Path) -> Callable[[], sqlite3.Connection]:
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE,
display_name TEXT,
password_sha512 TEXT,
role TEXT,
last_login INTEGER,
created_at INTEGER,
updated_at INTEGER,
mfa_enabled INTEGER,
mfa_secret TEXT
)
"""
)
conn.commit()
conn.close()
return connection_factory(path)
def _insert_user(
factory: Callable[[], sqlite3.Connection],
*,
user_id: str,
username: str,
password_hash: str,
role: str = "Admin",
mfa_enabled: int = 0,
mfa_secret: str = "",
) -> None:
conn = factory()
conn.execute(
"""
INSERT INTO users (
id, username, display_name, password_sha512, role,
last_login, created_at, updated_at, mfa_enabled, mfa_secret
) VALUES (?, ?, ?, ?, ?, 0, 0, 0, ?, ?)
""",
(user_id, username, username, password_hash, role, mfa_enabled, mfa_secret),
)
conn.commit()
conn.close()
def _service(factory: Callable[[], sqlite3.Connection]) -> OperatorAccountService:
repo = SQLiteUserRepository(factory)
return OperatorAccountService(repo)
def test_list_accounts_returns_users(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"password").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
service = _service(factory)
records = service.list_accounts()
assert len(records) == 1
assert records[0].username == "admin"
assert records[0].role == "Admin"
def test_create_account_enforces_uniqueness(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
service = _service(factory)
password_hash = hashlib.sha512(b"pw").hexdigest()
service.create_account(username="admin", password_sha512=password_hash, role="Admin")
with pytest.raises(UsernameAlreadyExistsError):
service.create_account(username="admin", password_sha512=password_hash, role="Admin")
def test_create_account_validates_password_hash(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
service = _service(factory)
with pytest.raises(InvalidPasswordHashError):
service.create_account(username="user", password_sha512="abc", role="User")
def test_delete_account_protects_last_user(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"pw").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
service = _service(factory)
with pytest.raises(LastUserError):
service.delete_account("admin")
def test_delete_account_prevents_self_deletion(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"pw").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
_insert_user(factory, user_id="2", username="user", password_hash=password_hash, role="User")
service = _service(factory)
with pytest.raises(CannotModifySelfError):
service.delete_account("admin", actor="admin")
def test_delete_account_prevents_last_admin_removal(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"pw").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
_insert_user(factory, user_id="2", username="user", password_hash=password_hash, role="User")
service = _service(factory)
with pytest.raises(LastAdminError):
service.delete_account("admin")
def test_change_role_demotes_only_when_valid(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"pw").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
_insert_user(factory, user_id="2", username="backup", password_hash=password_hash)
service = _service(factory)
service.change_role("backup", "User")
with pytest.raises(LastAdminError):
service.change_role("admin", "User")
with pytest.raises(InvalidRoleError):
service.change_role("admin", "invalid")
def test_reset_password_validates_hash(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
password_hash = hashlib.sha512(b"pw").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
service = _service(factory)
with pytest.raises(InvalidPasswordHashError):
service.reset_password("admin", "abc")
new_hash = hashlib.sha512(b"new").hexdigest()
service.reset_password("admin", new_hash)
def test_update_mfa_raises_for_unknown_user(tmp_path):
db = tmp_path / "users.db"
factory = _prepare_db(db)
service = _service(factory)
with pytest.raises(AccountNotFoundError):
service.update_mfa("missing", enabled=True, reset_secret=False)

View File

@@ -1,63 +0,0 @@
"""Tests for operator authentication builders."""
from __future__ import annotations
import pytest
from Data.Engine.builders import (
OperatorLoginRequest,
OperatorMFAVerificationRequest,
build_login_request,
build_mfa_request,
)
def test_build_login_request_uses_explicit_hash():
payload = {"username": "Admin", "password_sha512": "abc123"}
result = build_login_request(payload)
assert isinstance(result, OperatorLoginRequest)
assert result.username == "Admin"
assert result.password_sha512 == "abc123"
def test_build_login_request_hashes_plain_password():
payload = {"username": "user", "password": "secret"}
result = build_login_request(payload)
assert isinstance(result, OperatorLoginRequest)
assert result.username == "user"
assert result.password_sha512
assert result.password_sha512 != "secret"
@pytest.mark.parametrize(
"payload",
[
{"password": "secret"},
{"username": ""},
{"username": "user"},
],
)
def test_build_login_request_validation(payload):
with pytest.raises(ValueError):
build_login_request(payload)
def test_build_mfa_request_normalizes_code():
payload = {"pending_token": "token", "code": "12 34-56"}
result = build_mfa_request(payload)
assert isinstance(result, OperatorMFAVerificationRequest)
assert result.pending_token == "token"
assert result.code == "123456"
def test_build_mfa_request_requires_token_and_code():
with pytest.raises(ValueError):
build_mfa_request({"code": "123"})
with pytest.raises(ValueError):
build_mfa_request({"pending_token": "token", "code": "12"})

View File

@@ -1,197 +0,0 @@
"""Tests for the operator authentication service."""
from __future__ import annotations
import hashlib
import sqlite3
from pathlib import Path
from typing import Callable
import pytest
pyotp = pytest.importorskip("pyotp")
from Data.Engine.builders import (
OperatorLoginRequest,
OperatorMFAVerificationRequest,
)
from Data.Engine.repositories.sqlite.connection import connection_factory
from Data.Engine.repositories.sqlite.user_repository import SQLiteUserRepository
from Data.Engine.services.auth.operator_auth_service import (
InvalidCredentialsError,
InvalidMFACodeError,
OperatorAuthService,
)
def _prepare_db(path: Path) -> Callable[[], sqlite3.Connection]:
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE users (
id TEXT PRIMARY KEY,
username TEXT,
display_name TEXT,
password_sha512 TEXT,
role TEXT,
last_login INTEGER,
created_at INTEGER,
updated_at INTEGER,
mfa_enabled INTEGER,
mfa_secret TEXT
)
"""
)
conn.commit()
conn.close()
return connection_factory(path)
def _insert_user(
factory: Callable[[], sqlite3.Connection],
*,
user_id: str,
username: str,
password_hash: str,
role: str = "Admin",
mfa_enabled: int = 0,
mfa_secret: str = "",
) -> None:
conn = factory()
conn.execute(
"""
INSERT INTO users (
id, username, display_name, password_sha512, role,
last_login, created_at, updated_at, mfa_enabled, mfa_secret
) VALUES (?, ?, ?, ?, ?, 0, 0, 0, ?, ?)
""",
(user_id, username, username, password_hash, role, mfa_enabled, mfa_secret),
)
conn.commit()
conn.close()
def test_authenticate_success_updates_last_login(tmp_path):
db_path = tmp_path / "auth.db"
factory = _prepare_db(db_path)
password_hash = hashlib.sha512(b"password").hexdigest()
_insert_user(factory, user_id="1", username="admin", password_hash=password_hash)
repo = SQLiteUserRepository(factory)
service = OperatorAuthService(repo)
request = OperatorLoginRequest(username="admin", password_sha512=password_hash)
result = service.authenticate(request)
assert result.username == "admin"
conn = factory()
row = conn.execute("SELECT last_login FROM users WHERE username=?", ("admin",)).fetchone()
conn.close()
assert row[0] > 0
def test_authenticate_invalid_credentials(tmp_path):
db_path = tmp_path / "auth.db"
factory = _prepare_db(db_path)
repo = SQLiteUserRepository(factory)
service = OperatorAuthService(repo)
request = OperatorLoginRequest(username="missing", password_sha512="abc")
with pytest.raises(InvalidCredentialsError):
service.authenticate(request)
def test_mfa_verify_flow(tmp_path):
db_path = tmp_path / "auth.db"
factory = _prepare_db(db_path)
secret = pyotp.random_base32()
password_hash = hashlib.sha512(b"password").hexdigest()
_insert_user(
factory,
user_id="1",
username="admin",
password_hash=password_hash,
mfa_enabled=1,
mfa_secret=secret,
)
repo = SQLiteUserRepository(factory)
service = OperatorAuthService(repo)
login_request = OperatorLoginRequest(username="admin", password_sha512=password_hash)
challenge = service.authenticate(login_request)
assert challenge.stage == "verify"
totp = pyotp.TOTP(secret)
verify_request = OperatorMFAVerificationRequest(
pending_token=challenge.pending_token,
code=totp.now(),
)
result = service.verify_mfa(challenge, verify_request)
assert result.username == "admin"
def test_mfa_setup_flow_persists_secret(tmp_path):
db_path = tmp_path / "auth.db"
factory = _prepare_db(db_path)
password_hash = hashlib.sha512(b"password").hexdigest()
_insert_user(
factory,
user_id="1",
username="admin",
password_hash=password_hash,
mfa_enabled=1,
mfa_secret="",
)
repo = SQLiteUserRepository(factory)
service = OperatorAuthService(repo)
challenge = service.authenticate(OperatorLoginRequest(username="admin", password_sha512=password_hash))
assert challenge.stage == "setup"
assert challenge.secret
totp = pyotp.TOTP(challenge.secret)
verify_request = OperatorMFAVerificationRequest(
pending_token=challenge.pending_token,
code=totp.now(),
)
result = service.verify_mfa(challenge, verify_request)
assert result.username == "admin"
conn = factory()
stored_secret = conn.execute(
"SELECT mfa_secret FROM users WHERE username=?", ("admin",)
).fetchone()[0]
conn.close()
assert stored_secret
def test_mfa_invalid_code_raises(tmp_path):
db_path = tmp_path / "auth.db"
factory = _prepare_db(db_path)
secret = pyotp.random_base32()
password_hash = hashlib.sha512(b"password").hexdigest()
_insert_user(
factory,
user_id="1",
username="admin",
password_hash=password_hash,
mfa_enabled=1,
mfa_secret=secret,
)
repo = SQLiteUserRepository(factory)
service = OperatorAuthService(repo)
challenge = service.authenticate(OperatorLoginRequest(username="admin", password_sha512=password_hash))
verify_request = OperatorMFAVerificationRequest(
pending_token=challenge.pending_token,
code="000000",
)
with pytest.raises(InvalidMFACodeError):
service.verify_mfa(challenge, verify_request)

View File

@@ -1,83 +0,0 @@
import hashlib
import sqlite3
import unittest
from Data.Engine.repositories.sqlite import migrations
class MigrationTests(unittest.TestCase):
def test_apply_all_creates_expected_tables(self) -> None:
conn = sqlite3.connect(":memory:")
try:
migrations.apply_all(conn)
cursor = conn.cursor()
tables = {
row[0]
for row in cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)
}
self.assertIn("devices", tables)
self.assertIn("refresh_tokens", tables)
self.assertIn("enrollment_install_codes", tables)
self.assertIn("device_approvals", tables)
self.assertIn("scheduled_jobs", tables)
self.assertIn("scheduled_job_runs", tables)
self.assertIn("github_token", tables)
self.assertIn("users", tables)
cursor.execute(
"SELECT username, role, password_sha512 FROM users WHERE LOWER(username)=LOWER(?)",
("admin",),
)
row = cursor.fetchone()
self.assertIsNotNone(row)
if row:
self.assertEqual(row[0], "admin")
self.assertEqual(row[1].lower(), "admin")
self.assertEqual(row[2], hashlib.sha512(b"Password").hexdigest())
finally:
conn.close()
def test_ensure_default_admin_promotes_existing_user(self) -> None:
conn = sqlite3.connect(":memory:")
try:
conn.execute(
"""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
display_name TEXT,
password_sha512 TEXT,
role TEXT,
last_login INTEGER,
created_at INTEGER,
updated_at INTEGER,
mfa_enabled INTEGER DEFAULT 0,
mfa_secret TEXT
)
"""
)
conn.execute(
"INSERT INTO users (username, display_name, password_sha512, role) VALUES (?, ?, ?, ?)",
("admin", "Custom", "hash", "user"),
)
conn.commit()
migrations.ensure_default_admin(conn)
cursor = conn.cursor()
cursor.execute(
"SELECT role, password_sha512 FROM users WHERE LOWER(username)=LOWER(?)",
("admin",),
)
role, password_hash = cursor.fetchone()
self.assertEqual(role.lower(), "admin")
self.assertEqual(password_hash, "hash")
finally:
conn.close()
if __name__ == "__main__": # pragma: no cover - convenience for local runs
unittest.main()