Fix static asset fallback and seed default admin

This commit is contained in:
2025-10-22 19:37:47 -06:00
parent f361c51a5e
commit 7a9feebde5
6 changed files with 167 additions and 2 deletions

View File

@@ -66,6 +66,10 @@ def bootstrap() -> EngineRuntime:
else:
logger.info("migrations-skipped")
with sqlite_connection.connection_scope(settings.database_path) as conn:
sqlite_migrations.ensure_default_admin(conn)
logger.info("default-admin-ensured")
app = create_app(settings, db_factory=db_factory)
services = build_service_container(settings, db_factory=db_factory, logger=logger.getChild("services"))
app.extensions["engine_services"] = services

View File

@@ -122,6 +122,10 @@ def _resolve_static_root(project_root: Path) -> Path:
project_root / "Engine" / "web-interface",
project_root / "Data" / "Engine" / "WebUI" / "build",
project_root / "Data" / "Engine" / "WebUI",
project_root / "Server" / "web-interface" / "build",
project_root / "Server" / "web-interface",
project_root / "Server" / "WebUI" / "build",
project_root / "Server" / "WebUI",
project_root / "Data" / "Server" / "web-interface" / "build",
project_root / "Data" / "Server" / "web-interface",
project_root / "Data" / "Server" / "WebUI" / "build",

View File

@@ -9,7 +9,7 @@ from .connection import (
connection_factory,
connection_scope,
)
from .migrations import apply_all
from .migrations import apply_all, ensure_default_admin
__all__ = [
"SQLiteConnectionFactory",
@@ -18,6 +18,7 @@ __all__ = [
"connection_factory",
"connection_scope",
"apply_all",
"ensure_default_admin",
]
try: # pragma: no cover - optional dependency shim

View File

@@ -15,6 +15,10 @@ from typing import List, Optional, Sequence, Tuple
DEVICE_TABLE = "devices"
_DEFAULT_ADMIN_USERNAME = "admin"
_DEFAULT_ADMIN_PASSWORD_SHA512 = (
"e6c83b282aeb2e022844595721cc00bbda47cb24537c1779f9bb84f04039e1676e6ba8573e588da1052510e3aa0a32a9e55879ae22b0c2d62136fc0a3e85f8bb"
)
def apply_all(conn: sqlite3.Connection) -> None:
@@ -30,6 +34,8 @@ def apply_all(conn: sqlite3.Connection) -> None:
_ensure_github_token_table(conn)
_ensure_scheduled_jobs_table(conn)
_ensure_scheduled_job_run_tables(conn)
_ensure_users_table(conn)
_ensure_default_admin(conn)
conn.commit()
@@ -504,4 +510,86 @@ def _normalized_guid(value: Optional[str]) -> str:
return ""
return str(value).strip()
__all__ = ["apply_all"]
def _ensure_users_table(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
display_name TEXT,
password_sha512 TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'Admin',
last_login INTEGER,
created_at INTEGER,
updated_at INTEGER,
mfa_enabled INTEGER NOT NULL DEFAULT 0,
mfa_secret TEXT
)
"""
)
try:
cur.execute("PRAGMA table_info(users)")
columns = [row[1] for row in cur.fetchall()]
if "mfa_enabled" not in columns:
cur.execute("ALTER TABLE users ADD COLUMN mfa_enabled INTEGER NOT NULL DEFAULT 0")
if "mfa_secret" not in columns:
cur.execute("ALTER TABLE users ADD COLUMN mfa_secret TEXT")
except sqlite3.Error:
# Aligning the schema is best-effort; older deployments may lack ALTER
# TABLE privileges but can continue using existing columns.
pass
def _ensure_default_admin(conn: sqlite3.Connection) -> None:
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM users WHERE LOWER(role)='admin'")
row = cur.fetchone()
if row and (row[0] or 0):
return
now = int(datetime.now(timezone.utc).timestamp())
cur.execute(
"SELECT COUNT(*) FROM users WHERE LOWER(username)=LOWER(?)",
(_DEFAULT_ADMIN_USERNAME,),
)
existing = cur.fetchone()
if not existing or not (existing[0] or 0):
cur.execute(
"""
INSERT INTO users (
username, display_name, password_sha512, role,
last_login, created_at, updated_at, mfa_enabled, mfa_secret
) VALUES (?, ?, ?, 'Admin', 0, ?, ?, 0, NULL)
""",
(
_DEFAULT_ADMIN_USERNAME,
"Administrator",
_DEFAULT_ADMIN_PASSWORD_SHA512,
now,
now,
),
)
else:
cur.execute(
"""
UPDATE users
SET role='Admin',
updated_at=?
WHERE LOWER(username)=LOWER(?)
AND LOWER(role)!='admin'
""",
(now, _DEFAULT_ADMIN_USERNAME),
)
def ensure_default_admin(conn: sqlite3.Connection) -> None:
"""Guarantee that at least one admin account exists."""
_ensure_users_table(conn)
_ensure_default_admin(conn)
conn.commit()
__all__ = ["apply_all", "ensure_default_admin"]

View File

@@ -63,6 +63,23 @@ def test_static_root_falls_back_to_legacy_source(tmp_path, monkeypatch):
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_static_root_considers_runtime_copy(tmp_path, monkeypatch):
"""Runtime Server/WebUI copies should be considered when Data assets are missing."""
runtime_source = tmp_path / "Server" / "WebUI"
runtime_source.mkdir(parents=True)
(runtime_source / "index.html").write_text("runtime", encoding="utf-8")
monkeypatch.setenv("BOREALIS_ROOT", str(tmp_path))
monkeypatch.delenv("BOREALIS_STATIC_ROOT", raising=False)
settings = load_environment()
assert settings.flask.static_root == runtime_source.resolve()
monkeypatch.delenv("BOREALIS_ROOT", raising=False)
def test_resolve_project_root_defaults_to_repository(monkeypatch):
"""The project root should resolve to the repository checkout."""

View File

@@ -1,3 +1,4 @@
import hashlib
import sqlite3
import unittest
@@ -24,6 +25,56 @@ class MigrationTests(unittest.TestCase):
self.assertIn("scheduled_jobs", tables)
self.assertIn("scheduled_job_runs", tables)
self.assertIn("github_token", tables)
self.assertIn("users", tables)
cursor.execute(
"SELECT username, role, password_sha512 FROM users WHERE LOWER(username)=LOWER(?)",
("admin",),
)
row = cursor.fetchone()
self.assertIsNotNone(row)
if row:
self.assertEqual(row[0], "admin")
self.assertEqual(row[1].lower(), "admin")
self.assertEqual(row[2], hashlib.sha512(b"Password").hexdigest())
finally:
conn.close()
def test_ensure_default_admin_promotes_existing_user(self) -> None:
conn = sqlite3.connect(":memory:")
try:
conn.execute(
"""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
display_name TEXT,
password_sha512 TEXT,
role TEXT,
last_login INTEGER,
created_at INTEGER,
updated_at INTEGER,
mfa_enabled INTEGER DEFAULT 0,
mfa_secret TEXT
)
"""
)
conn.execute(
"INSERT INTO users (username, display_name, password_sha512, role) VALUES (?, ?, ?, ?)",
("admin", "Custom", "hash", "user"),
)
conn.commit()
migrations.ensure_default_admin(conn)
cursor = conn.cursor()
cursor.execute(
"SELECT role, password_sha512 FROM users WHERE LOWER(username)=LOWER(?)",
("admin",),
)
role, password_hash = cursor.fetchone()
self.assertEqual(role.lower(), "admin")
self.assertEqual(password_hash, "hash")
finally:
conn.close()