From a9f2a39f23189c6395505680b82d75ec9998bf16 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sun, 19 Oct 2025 04:18:30 -0600 Subject: [PATCH] Removed Test Files --- tests/test_agent_installer_code.py | 58 ------- tests/test_agent_security.py | 57 ------- tests/test_enrollment_install_codes.py | 213 ------------------------- 3 files changed, 328 deletions(-) delete mode 100644 tests/test_agent_installer_code.py delete mode 100644 tests/test_agent_security.py delete mode 100644 tests/test_enrollment_install_codes.py diff --git a/tests/test_agent_installer_code.py b/tests/test_agent_installer_code.py deleted file mode 100644 index ef6732f..0000000 --- a/tests/test_agent_installer_code.py +++ /dev/null @@ -1,58 +0,0 @@ -import json -import sys - -import pytest - - -@pytest.fixture -def agent_module(tmp_path, monkeypatch): - settings_dir = tmp_path / "Agent" / "Borealis" / "Settings" - settings_dir.mkdir(parents=True) - system_config = settings_dir / "agent_settings_SYSTEM.json" - system_config.write_text(json.dumps({ - "config_file_watcher_interval": 2, - "agent_id": "", - "regions": {}, - "installer_code": "", - }, indent=2)) - current_config = settings_dir / "agent_settings_CURRENTUSER.json" - current_config.write_text(json.dumps({ - "config_file_watcher_interval": 2, - "agent_id": "", - "regions": {}, - "installer_code": "", - }, indent=2)) - - monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path)) - monkeypatch.setenv("BOREALIS_AGENT_MODE", "system") - monkeypatch.setenv("BOREALIS_AGENT_CONFIG", "") - monkeypatch.setitem(sys.modules, "PyQt5", None) - monkeypatch.setitem(sys.modules, "qasync", None) - monkeypatch.setattr(sys, "argv", ["agent.py", "--system-service", "--config", "SYSTEM"], raising=False) - - agent = pytest.importorskip( - "Data.Agent.agent", reason="agent module requires optional dependencies" - ) - return agent, system_config - - -def test_shared_installer_code_cache_allows_system_reuse(agent_module, tmp_path): - agent, system_config = agent_module - - client = agent.AgentHttpClient() - shared_code = "SHARED-CODE-1234" - client.key_store.cache_installer_code(shared_code, consumer="CURRENTUSER") - - # System agent should discover the cached code even though its config is empty. - resolved = client._resolve_installer_code() - assert resolved == shared_code - - # Config should now persist the adopted code to avoid repeated lookups. - data = json.loads(system_config.read_text()) - assert data.get("installer_code") == shared_code - - # After enrollment completes, the cache should be cleared for future runs. - client._consume_installer_code() - assert client.key_store.load_cached_installer_code() is None - data = json.loads(system_config.read_text()) - assert data.get("installer_code") == "" diff --git a/tests/test_agent_security.py b/tests/test_agent_security.py deleted file mode 100644 index db2dd69..0000000 --- a/tests/test_agent_security.py +++ /dev/null @@ -1,57 +0,0 @@ - -from __future__ import annotations - -import shutil -import tempfile -import unittest - -import pathlib -import sys - -ROOT = pathlib.Path(__file__).resolve().parents[1] -if str(ROOT) not in sys.path: - sys.path.insert(0, str(ROOT)) - -try: - from Data.Agent.security import AgentKeyStore # type: ignore - _IMPORT_ERROR: Exception | None = None -except Exception as exc: # pragma: no cover - handled via skip - AgentKeyStore = None # type: ignore - _IMPORT_ERROR = exc - - -@unittest.skipIf(AgentKeyStore is None, f"security module unavailable: {_IMPORT_ERROR}") -class AgentKeyStoreTests(unittest.TestCase): - def test_roundtrip(self): - tmp_dir = tempfile.mkdtemp(prefix="akstest-") - try: - store = AgentKeyStore(tmp_dir, scope="CURRENTUSER") - identity = store.load_or_create_identity() - - self.assertTrue(identity.public_key_b64) - self.assertEqual(len(identity.fingerprint), 64) - - store.save_guid("ABC-123") - self.assertEqual(store.load_guid(), "ABC-123") - - store.save_access_token("access-token", expires_at=12345) - self.assertEqual(store.load_access_token(), "access-token") - self.assertEqual(store.get_access_expiry(), 12345) - - store.save_refresh_token("refresh-token") - self.assertEqual(store.load_refresh_token(), "refresh-token") - - store.set_access_binding(identity.fingerprint) - self.assertEqual(store.get_access_binding(), identity.fingerprint) - - store.save_server_certificate("-----BEGIN CERT-----\nABC\n-----END CERT-----") - self.assertIn("BEGIN CERT", store.load_server_certificate() or "") - - store.save_server_signing_key("PUBKEYDATA") - self.assertEqual(store.load_server_signing_key(), "PUBKEYDATA") - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_enrollment_install_codes.py b/tests/test_enrollment_install_codes.py deleted file mode 100644 index edcb81b..0000000 --- a/tests/test_enrollment_install_codes.py +++ /dev/null @@ -1,213 +0,0 @@ -import base64 -import os -import pathlib -import sqlite3 -import sys -import uuid - -import pytest -try: # pragma: no cover - optional dependency - from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric import ed25519 - _CRYPTO_IMPORT_ERROR: Exception | None = None -except Exception as exc: # pragma: no cover - dependency unavailable - serialization = None # type: ignore - ed25519 = None # type: ignore - _CRYPTO_IMPORT_ERROR = exc -try: # pragma: no cover - optional dependency - from flask import Flask - _FLASK_IMPORT_ERROR: Exception | None = None -except Exception as exc: # pragma: no cover - dependency unavailable - Flask = None # type: ignore - _FLASK_IMPORT_ERROR = exc - -ROOT = pathlib.Path(__file__).resolve().parents[1] -if str(ROOT) not in sys.path: - sys.path.insert(0, str(ROOT)) - -from Data.Server.Modules import db_migrations -from Data.Server.Modules.auth.rate_limit import SlidingWindowRateLimiter -from Data.Server.Modules.enrollment.nonce_store import NonceCache - -if Flask is not None: # pragma: no cover - dependency unavailable - from Data.Server.Modules.enrollment import routes as enrollment_routes -else: # pragma: no cover - dependency unavailable - enrollment_routes = None # type: ignore - - -class _DummyJWTService: - def issue_access_token(self, guid: str, fingerprint: str, token_version: int, expires_in: int = 900, extra_claims=None): - return f"token-{guid}" - - -class _DummySigner: - def public_base64_spki(self) -> str: - return "" - - -def _make_app(db_path: str, tls_path: str): - if Flask is None or enrollment_routes is None: # pragma: no cover - dependency unavailable - pytest.skip(f"flask unavailable: {_FLASK_IMPORT_ERROR}") - app = Flask(__name__) - - def _factory(): - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - return conn - - enrollment_routes.register( - app, - db_conn_factory=_factory, - log=lambda channel, message: None, - jwt_service=_DummyJWTService(), - tls_bundle_path=tls_path, - ip_rate_limiter=SlidingWindowRateLimiter(), - fp_rate_limiter=SlidingWindowRateLimiter(), - nonce_cache=NonceCache(ttl_seconds=30.0), - script_signer=_DummySigner(), - ) - return app, _factory - - -def _create_install_code(conn: sqlite3.Connection, code: str, *, max_uses: int = 2): - cur = conn.cursor() - record_id = str(uuid.uuid4()) - cur.execute( - """ - INSERT INTO enrollment_install_codes ( - id, code, expires_at, created_by_user_id, max_uses, use_count - ) - VALUES (?, ?, datetime('now', '+6 hours'), 'test-user', ?, 0) - """, - (record_id, code, max_uses), - ) - conn.commit() - return record_id - - -def _perform_enrollment_cycle(app, factory, code: str, private_key): - client = app.test_client() - public_der = private_key.public_key().public_bytes( - serialization.Encoding.DER, - serialization.PublicFormat.SubjectPublicKeyInfo, - ) - public_b64 = base64.b64encode(public_der).decode("ascii") - client_nonce = os.urandom(32) - payload = { - "hostname": "unit-test-host", - "enrollment_code": code, - "agent_pubkey": public_b64, - "client_nonce": base64.b64encode(client_nonce).decode("ascii"), - } - request_resp = client.post("/api/agent/enroll/request", json=payload) - assert request_resp.status_code == 200 - request_data = request_resp.get_json() - approval_reference = request_data["approval_reference"] - - with factory() as conn: - cur = conn.cursor() - cur.execute( - """ - UPDATE device_approvals - SET status = 'approved', - approved_by_user_id = 'tester' - WHERE approval_reference = ? - """, - (approval_reference,), - ) - cur.execute( - """ - SELECT server_nonce, client_nonce - FROM device_approvals - WHERE approval_reference = ? - """, - (approval_reference,), - ) - row = cur.fetchone() - assert row is not None - server_nonce_b64 = row["server_nonce"] - - server_nonce = base64.b64decode(server_nonce_b64) - proof_message = server_nonce + approval_reference.encode("utf-8") + client_nonce - proof_sig = private_key.sign(proof_message) - poll_payload = { - "approval_reference": approval_reference, - "client_nonce": base64.b64encode(client_nonce).decode("ascii"), - "proof_sig": base64.b64encode(proof_sig).decode("ascii"), - } - poll_resp = client.post("/api/agent/enroll/poll", json=poll_payload) - assert poll_resp.status_code == 200 - return poll_resp.get_json() - - -@pytest.mark.parametrize("max_uses", [2]) -@pytest.mark.skipif(ed25519 is None, reason=f"cryptography unavailable: {_CRYPTO_IMPORT_ERROR}") -@pytest.mark.skipif(Flask is None, reason=f"flask unavailable: {_FLASK_IMPORT_ERROR}") -def test_install_code_allows_multiple_and_reuse(tmp_path, max_uses): - db_path = tmp_path / "test.db" - conn = sqlite3.connect(db_path) - db_migrations.apply_all(conn) - _create_install_code(conn, "TEST-CODE-1234", max_uses=max_uses) - conn.close() - - tls_path = tmp_path / "tls.pem" - tls_path.write_text("TEST CERT") - - app, factory = _make_app(str(db_path), str(tls_path)) - - private_key = ed25519.Ed25519PrivateKey.generate() - - # First enrollment consumes one use but keeps the code active. - first = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key) - assert first["status"] == "approved" - - with factory() as conn: - cur = conn.cursor() - cur.execute( - "SELECT use_count, max_uses, used_at, last_used_at FROM enrollment_install_codes WHERE code = ?", - ("TEST-CODE-1234",), - ) - row = cur.fetchone() - assert row is not None - assert row["use_count"] == 1 - assert row["max_uses"] == max_uses - assert row["used_at"] is None - assert row["last_used_at"] is not None - - # Second enrollment hits the configured max uses. - second = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key) - assert second["status"] == "approved" - - with factory() as conn: - cur = conn.cursor() - cur.execute( - "SELECT use_count, used_at, last_used_at, used_by_guid FROM enrollment_install_codes WHERE code = ?", - ("TEST-CODE-1234",), - ) - row = cur.fetchone() - assert row is not None - assert row["use_count"] == max_uses - assert row["used_at"] is not None - assert row["last_used_at"] is not None - consumed_guid = row["used_by_guid"] - assert consumed_guid - - # Additional enrollments from the same identity reuse the stored GUID even after consumption. - third = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key) - assert third["status"] == "approved" - - with factory() as conn: - cur = conn.cursor() - cur.execute( - "SELECT use_count, used_at, last_used_at, used_by_guid FROM enrollment_install_codes WHERE code = ?", - ("TEST-CODE-1234",), - ) - row = cur.fetchone() - assert row is not None - assert row["use_count"] == max_uses + 1 - assert row["used_by_guid"] == consumed_guid - assert row["used_at"] is not None - assert row["last_used_at"] is not None - - cur.execute("SELECT COUNT(*) FROM devices WHERE guid = ?", (consumed_guid,)) - assert cur.fetchone()[0] == 1