mirror of
https://github.com/bunny-lab-io/Borealis.git
synced 2025-10-26 22:01:59 -06:00
Removed Test Files
This commit is contained in:
@@ -1,58 +0,0 @@
|
|||||||
import json
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def agent_module(tmp_path, monkeypatch):
|
|
||||||
settings_dir = tmp_path / "Agent" / "Borealis" / "Settings"
|
|
||||||
settings_dir.mkdir(parents=True)
|
|
||||||
system_config = settings_dir / "agent_settings_SYSTEM.json"
|
|
||||||
system_config.write_text(json.dumps({
|
|
||||||
"config_file_watcher_interval": 2,
|
|
||||||
"agent_id": "",
|
|
||||||
"regions": {},
|
|
||||||
"installer_code": "",
|
|
||||||
}, indent=2))
|
|
||||||
current_config = settings_dir / "agent_settings_CURRENTUSER.json"
|
|
||||||
current_config.write_text(json.dumps({
|
|
||||||
"config_file_watcher_interval": 2,
|
|
||||||
"agent_id": "",
|
|
||||||
"regions": {},
|
|
||||||
"installer_code": "",
|
|
||||||
}, indent=2))
|
|
||||||
|
|
||||||
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
|
|
||||||
monkeypatch.setenv("BOREALIS_AGENT_MODE", "system")
|
|
||||||
monkeypatch.setenv("BOREALIS_AGENT_CONFIG", "")
|
|
||||||
monkeypatch.setitem(sys.modules, "PyQt5", None)
|
|
||||||
monkeypatch.setitem(sys.modules, "qasync", None)
|
|
||||||
monkeypatch.setattr(sys, "argv", ["agent.py", "--system-service", "--config", "SYSTEM"], raising=False)
|
|
||||||
|
|
||||||
agent = pytest.importorskip(
|
|
||||||
"Data.Agent.agent", reason="agent module requires optional dependencies"
|
|
||||||
)
|
|
||||||
return agent, system_config
|
|
||||||
|
|
||||||
|
|
||||||
def test_shared_installer_code_cache_allows_system_reuse(agent_module, tmp_path):
|
|
||||||
agent, system_config = agent_module
|
|
||||||
|
|
||||||
client = agent.AgentHttpClient()
|
|
||||||
shared_code = "SHARED-CODE-1234"
|
|
||||||
client.key_store.cache_installer_code(shared_code, consumer="CURRENTUSER")
|
|
||||||
|
|
||||||
# System agent should discover the cached code even though its config is empty.
|
|
||||||
resolved = client._resolve_installer_code()
|
|
||||||
assert resolved == shared_code
|
|
||||||
|
|
||||||
# Config should now persist the adopted code to avoid repeated lookups.
|
|
||||||
data = json.loads(system_config.read_text())
|
|
||||||
assert data.get("installer_code") == shared_code
|
|
||||||
|
|
||||||
# After enrollment completes, the cache should be cleared for future runs.
|
|
||||||
client._consume_installer_code()
|
|
||||||
assert client.key_store.load_cached_installer_code() is None
|
|
||||||
data = json.loads(system_config.read_text())
|
|
||||||
assert data.get("installer_code") == ""
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
import pathlib
|
|
||||||
import sys
|
|
||||||
|
|
||||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
|
||||||
if str(ROOT) not in sys.path:
|
|
||||||
sys.path.insert(0, str(ROOT))
|
|
||||||
|
|
||||||
try:
|
|
||||||
from Data.Agent.security import AgentKeyStore # type: ignore
|
|
||||||
_IMPORT_ERROR: Exception | None = None
|
|
||||||
except Exception as exc: # pragma: no cover - handled via skip
|
|
||||||
AgentKeyStore = None # type: ignore
|
|
||||||
_IMPORT_ERROR = exc
|
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipIf(AgentKeyStore is None, f"security module unavailable: {_IMPORT_ERROR}")
|
|
||||||
class AgentKeyStoreTests(unittest.TestCase):
|
|
||||||
def test_roundtrip(self):
|
|
||||||
tmp_dir = tempfile.mkdtemp(prefix="akstest-")
|
|
||||||
try:
|
|
||||||
store = AgentKeyStore(tmp_dir, scope="CURRENTUSER")
|
|
||||||
identity = store.load_or_create_identity()
|
|
||||||
|
|
||||||
self.assertTrue(identity.public_key_b64)
|
|
||||||
self.assertEqual(len(identity.fingerprint), 64)
|
|
||||||
|
|
||||||
store.save_guid("ABC-123")
|
|
||||||
self.assertEqual(store.load_guid(), "ABC-123")
|
|
||||||
|
|
||||||
store.save_access_token("access-token", expires_at=12345)
|
|
||||||
self.assertEqual(store.load_access_token(), "access-token")
|
|
||||||
self.assertEqual(store.get_access_expiry(), 12345)
|
|
||||||
|
|
||||||
store.save_refresh_token("refresh-token")
|
|
||||||
self.assertEqual(store.load_refresh_token(), "refresh-token")
|
|
||||||
|
|
||||||
store.set_access_binding(identity.fingerprint)
|
|
||||||
self.assertEqual(store.get_access_binding(), identity.fingerprint)
|
|
||||||
|
|
||||||
store.save_server_certificate("-----BEGIN CERT-----\nABC\n-----END CERT-----")
|
|
||||||
self.assertIn("BEGIN CERT", store.load_server_certificate() or "")
|
|
||||||
|
|
||||||
store.save_server_signing_key("PUBKEYDATA")
|
|
||||||
self.assertEqual(store.load_server_signing_key(), "PUBKEYDATA")
|
|
||||||
finally:
|
|
||||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
@@ -1,213 +0,0 @@
|
|||||||
import base64
|
|
||||||
import os
|
|
||||||
import pathlib
|
|
||||||
import sqlite3
|
|
||||||
import sys
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
try: # pragma: no cover - optional dependency
|
|
||||||
from cryptography.hazmat.primitives import serialization
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
||||||
_CRYPTO_IMPORT_ERROR: Exception | None = None
|
|
||||||
except Exception as exc: # pragma: no cover - dependency unavailable
|
|
||||||
serialization = None # type: ignore
|
|
||||||
ed25519 = None # type: ignore
|
|
||||||
_CRYPTO_IMPORT_ERROR = exc
|
|
||||||
try: # pragma: no cover - optional dependency
|
|
||||||
from flask import Flask
|
|
||||||
_FLASK_IMPORT_ERROR: Exception | None = None
|
|
||||||
except Exception as exc: # pragma: no cover - dependency unavailable
|
|
||||||
Flask = None # type: ignore
|
|
||||||
_FLASK_IMPORT_ERROR = exc
|
|
||||||
|
|
||||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
|
||||||
if str(ROOT) not in sys.path:
|
|
||||||
sys.path.insert(0, str(ROOT))
|
|
||||||
|
|
||||||
from Data.Server.Modules import db_migrations
|
|
||||||
from Data.Server.Modules.auth.rate_limit import SlidingWindowRateLimiter
|
|
||||||
from Data.Server.Modules.enrollment.nonce_store import NonceCache
|
|
||||||
|
|
||||||
if Flask is not None: # pragma: no cover - dependency unavailable
|
|
||||||
from Data.Server.Modules.enrollment import routes as enrollment_routes
|
|
||||||
else: # pragma: no cover - dependency unavailable
|
|
||||||
enrollment_routes = None # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class _DummyJWTService:
|
|
||||||
def issue_access_token(self, guid: str, fingerprint: str, token_version: int, expires_in: int = 900, extra_claims=None):
|
|
||||||
return f"token-{guid}"
|
|
||||||
|
|
||||||
|
|
||||||
class _DummySigner:
|
|
||||||
def public_base64_spki(self) -> str:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
|
|
||||||
def _make_app(db_path: str, tls_path: str):
|
|
||||||
if Flask is None or enrollment_routes is None: # pragma: no cover - dependency unavailable
|
|
||||||
pytest.skip(f"flask unavailable: {_FLASK_IMPORT_ERROR}")
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
def _factory():
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
return conn
|
|
||||||
|
|
||||||
enrollment_routes.register(
|
|
||||||
app,
|
|
||||||
db_conn_factory=_factory,
|
|
||||||
log=lambda channel, message: None,
|
|
||||||
jwt_service=_DummyJWTService(),
|
|
||||||
tls_bundle_path=tls_path,
|
|
||||||
ip_rate_limiter=SlidingWindowRateLimiter(),
|
|
||||||
fp_rate_limiter=SlidingWindowRateLimiter(),
|
|
||||||
nonce_cache=NonceCache(ttl_seconds=30.0),
|
|
||||||
script_signer=_DummySigner(),
|
|
||||||
)
|
|
||||||
return app, _factory
|
|
||||||
|
|
||||||
|
|
||||||
def _create_install_code(conn: sqlite3.Connection, code: str, *, max_uses: int = 2):
|
|
||||||
cur = conn.cursor()
|
|
||||||
record_id = str(uuid.uuid4())
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO enrollment_install_codes (
|
|
||||||
id, code, expires_at, created_by_user_id, max_uses, use_count
|
|
||||||
)
|
|
||||||
VALUES (?, ?, datetime('now', '+6 hours'), 'test-user', ?, 0)
|
|
||||||
""",
|
|
||||||
(record_id, code, max_uses),
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
return record_id
|
|
||||||
|
|
||||||
|
|
||||||
def _perform_enrollment_cycle(app, factory, code: str, private_key):
|
|
||||||
client = app.test_client()
|
|
||||||
public_der = private_key.public_key().public_bytes(
|
|
||||||
serialization.Encoding.DER,
|
|
||||||
serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
||||||
)
|
|
||||||
public_b64 = base64.b64encode(public_der).decode("ascii")
|
|
||||||
client_nonce = os.urandom(32)
|
|
||||||
payload = {
|
|
||||||
"hostname": "unit-test-host",
|
|
||||||
"enrollment_code": code,
|
|
||||||
"agent_pubkey": public_b64,
|
|
||||||
"client_nonce": base64.b64encode(client_nonce).decode("ascii"),
|
|
||||||
}
|
|
||||||
request_resp = client.post("/api/agent/enroll/request", json=payload)
|
|
||||||
assert request_resp.status_code == 200
|
|
||||||
request_data = request_resp.get_json()
|
|
||||||
approval_reference = request_data["approval_reference"]
|
|
||||||
|
|
||||||
with factory() as conn:
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
UPDATE device_approvals
|
|
||||||
SET status = 'approved',
|
|
||||||
approved_by_user_id = 'tester'
|
|
||||||
WHERE approval_reference = ?
|
|
||||||
""",
|
|
||||||
(approval_reference,),
|
|
||||||
)
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
SELECT server_nonce, client_nonce
|
|
||||||
FROM device_approvals
|
|
||||||
WHERE approval_reference = ?
|
|
||||||
""",
|
|
||||||
(approval_reference,),
|
|
||||||
)
|
|
||||||
row = cur.fetchone()
|
|
||||||
assert row is not None
|
|
||||||
server_nonce_b64 = row["server_nonce"]
|
|
||||||
|
|
||||||
server_nonce = base64.b64decode(server_nonce_b64)
|
|
||||||
proof_message = server_nonce + approval_reference.encode("utf-8") + client_nonce
|
|
||||||
proof_sig = private_key.sign(proof_message)
|
|
||||||
poll_payload = {
|
|
||||||
"approval_reference": approval_reference,
|
|
||||||
"client_nonce": base64.b64encode(client_nonce).decode("ascii"),
|
|
||||||
"proof_sig": base64.b64encode(proof_sig).decode("ascii"),
|
|
||||||
}
|
|
||||||
poll_resp = client.post("/api/agent/enroll/poll", json=poll_payload)
|
|
||||||
assert poll_resp.status_code == 200
|
|
||||||
return poll_resp.get_json()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("max_uses", [2])
|
|
||||||
@pytest.mark.skipif(ed25519 is None, reason=f"cryptography unavailable: {_CRYPTO_IMPORT_ERROR}")
|
|
||||||
@pytest.mark.skipif(Flask is None, reason=f"flask unavailable: {_FLASK_IMPORT_ERROR}")
|
|
||||||
def test_install_code_allows_multiple_and_reuse(tmp_path, max_uses):
|
|
||||||
db_path = tmp_path / "test.db"
|
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
db_migrations.apply_all(conn)
|
|
||||||
_create_install_code(conn, "TEST-CODE-1234", max_uses=max_uses)
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
tls_path = tmp_path / "tls.pem"
|
|
||||||
tls_path.write_text("TEST CERT")
|
|
||||||
|
|
||||||
app, factory = _make_app(str(db_path), str(tls_path))
|
|
||||||
|
|
||||||
private_key = ed25519.Ed25519PrivateKey.generate()
|
|
||||||
|
|
||||||
# First enrollment consumes one use but keeps the code active.
|
|
||||||
first = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key)
|
|
||||||
assert first["status"] == "approved"
|
|
||||||
|
|
||||||
with factory() as conn:
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute(
|
|
||||||
"SELECT use_count, max_uses, used_at, last_used_at FROM enrollment_install_codes WHERE code = ?",
|
|
||||||
("TEST-CODE-1234",),
|
|
||||||
)
|
|
||||||
row = cur.fetchone()
|
|
||||||
assert row is not None
|
|
||||||
assert row["use_count"] == 1
|
|
||||||
assert row["max_uses"] == max_uses
|
|
||||||
assert row["used_at"] is None
|
|
||||||
assert row["last_used_at"] is not None
|
|
||||||
|
|
||||||
# Second enrollment hits the configured max uses.
|
|
||||||
second = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key)
|
|
||||||
assert second["status"] == "approved"
|
|
||||||
|
|
||||||
with factory() as conn:
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute(
|
|
||||||
"SELECT use_count, used_at, last_used_at, used_by_guid FROM enrollment_install_codes WHERE code = ?",
|
|
||||||
("TEST-CODE-1234",),
|
|
||||||
)
|
|
||||||
row = cur.fetchone()
|
|
||||||
assert row is not None
|
|
||||||
assert row["use_count"] == max_uses
|
|
||||||
assert row["used_at"] is not None
|
|
||||||
assert row["last_used_at"] is not None
|
|
||||||
consumed_guid = row["used_by_guid"]
|
|
||||||
assert consumed_guid
|
|
||||||
|
|
||||||
# Additional enrollments from the same identity reuse the stored GUID even after consumption.
|
|
||||||
third = _perform_enrollment_cycle(app, factory, "TEST-CODE-1234", private_key)
|
|
||||||
assert third["status"] == "approved"
|
|
||||||
|
|
||||||
with factory() as conn:
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute(
|
|
||||||
"SELECT use_count, used_at, last_used_at, used_by_guid FROM enrollment_install_codes WHERE code = ?",
|
|
||||||
("TEST-CODE-1234",),
|
|
||||||
)
|
|
||||||
row = cur.fetchone()
|
|
||||||
assert row is not None
|
|
||||||
assert row["use_count"] == max_uses + 1
|
|
||||||
assert row["used_by_guid"] == consumed_guid
|
|
||||||
assert row["used_at"] is not None
|
|
||||||
assert row["last_used_at"] is not None
|
|
||||||
|
|
||||||
cur.execute("SELECT COUNT(*) FROM devices WHERE guid = ?", (consumed_guid,))
|
|
||||||
assert cur.fetchone()[0] == 1
|
|
||||||
Reference in New Issue
Block a user