From 2482f01a166dce2bc70f8cb22cf52dac85cf0a88 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sun, 19 Oct 2025 14:39:43 -0600 Subject: [PATCH] Script Functionally FINALLY reinstated --- Data/Server/Modules/crypto/certificates.py | 328 ++++++++++++++++----- Data/Server/server.py | 2 +- 2 files changed, 263 insertions(+), 67 deletions(-) diff --git a/Data/Server/Modules/crypto/certificates.py b/Data/Server/Modules/crypto/certificates.py index cf95b61..d5f18a6 100644 --- a/Data/Server/Modules/crypto/certificates.py +++ b/Data/Server/Modules/crypto/certificates.py @@ -1,9 +1,10 @@ """ -Self-signed certificate management for Borealis. +Server TLS certificate management. -The production Flask server and the Vite dev server both consume the same -certificate chain so agents and browsers can pin a single trust anchor during -enrollment. +Borealis now issues a dedicated root CA and a leaf server certificate so that +agents can pin the CA without requiring a re-enrollment every time the server +certificate is refreshed. The CA is persisted alongside the server key so that +existing deployments can be upgraded in-place. """ from __future__ import annotations @@ -12,32 +13,36 @@ import os import ssl from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Tuple +from typing import Optional, Tuple from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID -from Modules.runtime import ensure_server_certificates_dir, server_certificates_path, runtime_path +from Modules.runtime import ensure_server_certificates_dir, runtime_path, server_certificates_path _CERT_DIR = server_certificates_path() _CERT_FILE = _CERT_DIR / "borealis-server-cert.pem" _KEY_FILE = _CERT_DIR / "borealis-server-key.pem" _BUNDLE_FILE = _CERT_DIR / "borealis-server-bundle.pem" +_CA_KEY_FILE = _CERT_DIR / "borealis-root-ca-key.pem" +_CA_CERT_FILE = _CERT_DIR / "borealis-root-ca.pem" _LEGACY_CERT_DIR = runtime_path("certs") _LEGACY_CERT_FILE = _LEGACY_CERT_DIR / "borealis-server-cert.pem" _LEGACY_KEY_FILE = _LEGACY_CERT_DIR / "borealis-server-key.pem" _LEGACY_BUNDLE_FILE = _LEGACY_CERT_DIR / "borealis-server-bundle.pem" -# 100-year lifetime (effectively "never" for self-signed deployments). -_CERT_VALIDITY = timedelta(days=365 * 100) +_ROOT_COMMON_NAME = "Borealis Root CA" +_ORG_NAME = "Borealis" +_ROOT_VALIDITY = timedelta(days=365 * 100) +_SERVER_VALIDITY = timedelta(days=365 * 5) def ensure_certificate(common_name: str = "Borealis Server") -> Tuple[Path, Path, Path]: """ - Ensure the self-signed certificate and key exist on disk. + Ensure the root CA, server certificate, and bundle exist on disk. Returns (cert_path, key_path, bundle_path). """ @@ -45,87 +50,210 @@ def ensure_certificate(common_name: str = "Borealis Server") -> Tuple[Path, Path ensure_server_certificates_dir() _migrate_legacy_material_if_present() - regenerate = not (_CERT_FILE.exists() and _KEY_FILE.exists()) - if not regenerate: - try: - with _CERT_FILE.open("rb") as fh: - cert = x509.load_pem_x509_certificate(fh.read()) - try: - expiry = cert.not_valid_after_utc # type: ignore[attr-defined] - except AttributeError: - expiry = cert.not_valid_after.replace(tzinfo=timezone.utc) - if expiry <= datetime.now(tz=timezone.utc): - regenerate = True - except Exception: - regenerate = True + ca_key, ca_cert, ca_regenerated = _ensure_root_ca() - if regenerate: - _generate_certificate(common_name) + server_cert = _load_certificate(_CERT_FILE) + needs_regen = ca_regenerated or _server_certificate_needs_regeneration(server_cert, ca_cert) + if needs_regen: + server_cert = _generate_server_certificate(common_name, ca_key, ca_cert) - if not _BUNDLE_FILE.exists(): - _BUNDLE_FILE.write_bytes(_CERT_FILE.read_bytes()) + if server_cert is None: + server_cert = _generate_server_certificate(common_name, ca_key, ca_cert) + + _write_bundle(server_cert, ca_cert) return _CERT_FILE, _KEY_FILE, _BUNDLE_FILE def _migrate_legacy_material_if_present() -> None: - if _CERT_FILE.exists() and _KEY_FILE.exists(): - return + # Promote legacy runtime certificates (Server/Borealis/certs) into the new location. + if not _CERT_FILE.exists() or not _KEY_FILE.exists(): + legacy_cert = _LEGACY_CERT_FILE + legacy_key = _LEGACY_KEY_FILE + if legacy_cert.exists() and legacy_key.exists(): + try: + ensure_server_certificates_dir() + if not _CERT_FILE.exists(): + _safe_copy(legacy_cert, _CERT_FILE) + if not _KEY_FILE.exists(): + _safe_copy(legacy_key, _KEY_FILE) + except Exception: + pass - legacy_cert = _LEGACY_CERT_FILE - legacy_key = _LEGACY_KEY_FILE - legacy_bundle = _LEGACY_BUNDLE_FILE - if not legacy_cert.exists() or not legacy_key.exists(): - return +def _ensure_root_ca() -> Tuple[ec.EllipticCurvePrivateKey, x509.Certificate, bool]: + regenerated = False + + ca_key: Optional[ec.EllipticCurvePrivateKey] = None + ca_cert: Optional[x509.Certificate] = None + + if _CA_KEY_FILE.exists() and _CA_CERT_FILE.exists(): + try: + ca_key = _load_private_key(_CA_KEY_FILE) + ca_cert = _load_certificate(_CA_CERT_FILE) + if ca_cert is not None and ca_key is not None: + expiry = _cert_not_after(ca_cert) + subject = ca_cert.subject + subject_cn = "" + try: + subject_cn = subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value # type: ignore[index] + except Exception: + subject_cn = "" + try: + basic = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints).value # type: ignore[attr-defined] + is_ca = bool(basic.ca) + except Exception: + is_ca = False + if ( + expiry <= datetime.now(tz=timezone.utc) + or not is_ca + or subject_cn != _ROOT_COMMON_NAME + ): + regenerated = True + else: + regenerated = True + except Exception: + regenerated = True + else: + regenerated = True + + if regenerated or ca_key is None or ca_cert is None: + ca_key = ec.generate_private_key(ec.SECP384R1()) + public_key = ca_key.public_key() + + now = datetime.now(tz=timezone.utc) + builder = ( + x509.CertificateBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, _ROOT_COMMON_NAME), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME), + ] + ) + ) + .issuer_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, _ROOT_COMMON_NAME), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME), + ] + ) + ) + .public_key(public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(now - timedelta(minutes=5)) + .not_valid_after(now + _ROOT_VALIDITY) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=True, + crl_sign=True, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.SubjectKeyIdentifier.from_public_key(public_key), + critical=False, + ) + ) + + builder = builder.add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), + critical=False, + ) + + ca_cert = builder.sign(private_key=ca_key, algorithm=hashes.SHA384()) + + _CA_KEY_FILE.write_bytes( + ca_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + _CA_CERT_FILE.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM)) + + _tighten_permissions(_CA_KEY_FILE) + _tighten_permissions(_CA_CERT_FILE) + else: + regenerated = False + + return ca_key, ca_cert, regenerated + + +def _server_certificate_needs_regeneration( + server_cert: Optional[x509.Certificate], + ca_cert: x509.Certificate, +) -> bool: + if server_cert is None: + return True try: - ensure_server_certificates_dir() - if not _CERT_FILE.exists(): - try: - legacy_cert.replace(_CERT_FILE) - except Exception: - _CERT_FILE.write_bytes(legacy_cert.read_bytes()) - if not _KEY_FILE.exists(): - try: - legacy_key.replace(_KEY_FILE) - except Exception: - _KEY_FILE.write_bytes(legacy_key.read_bytes()) - if legacy_bundle.exists() and not _BUNDLE_FILE.exists(): - try: - legacy_bundle.replace(_BUNDLE_FILE) - except Exception: - _BUNDLE_FILE.write_bytes(legacy_bundle.read_bytes()) + if server_cert.issuer != ca_cert.subject: + return True except Exception: - return + return True + + try: + expiry = _cert_not_after(server_cert) + if expiry <= datetime.now(tz=timezone.utc): + return True + except Exception: + return True + + try: + basic = server_cert.extensions.get_extension_for_class(x509.BasicConstraints).value # type: ignore[attr-defined] + if basic.ca: + return True + except Exception: + return True + + try: + eku = server_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value # type: ignore[attr-defined] + if ExtendedKeyUsageOID.SERVER_AUTH not in eku: + return True + except Exception: + return True + + return False -def _generate_certificate(common_name: str) -> None: +def _generate_server_certificate( + common_name: str, + ca_key: ec.EllipticCurvePrivateKey, + ca_cert: x509.Certificate, +) -> x509.Certificate: private_key = ec.generate_private_key(ec.SECP384R1()) public_key = private_key.public_key() now = datetime.now(tz=timezone.utc) + ca_expiry = _cert_not_after(ca_cert) + candidate_expiry = now + _SERVER_VALIDITY + not_after = min(ca_expiry - timedelta(days=1), candidate_expiry) + builder = ( x509.CertificateBuilder() .subject_name( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, common_name), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Borealis"), - ] - ) - ) - .issuer_name( - x509.Name( - [ - x509.NameAttribute(NameOID.COMMON_NAME, common_name), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME), ] ) ) + .issuer_name(ca_cert.subject) .public_key(public_key) .serial_number(x509.random_serial_number()) .not_valid_before(now - timedelta(minutes=5)) - .not_valid_after(now + _CERT_VALIDITY) + .not_valid_after(not_after) .add_extension( x509.SubjectAlternativeName( [ @@ -136,10 +264,36 @@ def _generate_certificate(common_name: str) -> None: ), critical=False, ) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) + .add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), + critical=False, + ) + .add_extension( + x509.SubjectKeyIdentifier.from_public_key(public_key), + critical=False, + ) + .add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_key.public_key()), + critical=False, + ) ) - certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA384()) + certificate = builder.sign(private_key=ca_key, algorithm=hashes.SHA384()) _KEY_FILE.write_bytes( private_key.private_bytes( @@ -150,10 +304,30 @@ def _generate_certificate(common_name: str) -> None: ) _CERT_FILE.write_bytes(certificate.public_bytes(serialization.Encoding.PEM)) - # Propagate filesystem permissions to restrict accidental disclosure. _tighten_permissions(_KEY_FILE) _tighten_permissions(_CERT_FILE) + return certificate + + +def _write_bundle(server_cert: x509.Certificate, ca_cert: x509.Certificate) -> None: + try: + server_pem = server_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + ca_pem = ca_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + except Exception: + return + + bundle = f"{server_pem}\n{ca_pem}\n" + _BUNDLE_FILE.write_text(bundle, encoding="utf-8") + _tighten_permissions(_BUNDLE_FILE) + + +def _safe_copy(src: Path, dst: Path) -> None: + try: + dst.write_bytes(src.read_bytes()) + except Exception: + pass + def _tighten_permissions(path: Path) -> None: try: @@ -163,11 +337,33 @@ def _tighten_permissions(path: Path) -> None: pass +def _load_private_key(path: Path) -> ec.EllipticCurvePrivateKey: + with path.open("rb") as fh: + return serialization.load_pem_private_key(fh.read(), password=None) + + +def _load_certificate(path: Path) -> Optional[x509.Certificate]: + try: + return x509.load_pem_x509_certificate(path.read_bytes()) + except Exception: + return None + + +def _cert_not_after(cert: x509.Certificate) -> datetime: + try: + return cert.not_valid_after_utc # type: ignore[attr-defined] + except AttributeError: + value = cert.not_valid_after + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + + def build_ssl_context() -> ssl.SSLContext: - cert_path, key_path, _bundle_path = ensure_certificate() + cert_path, key_path, bundle_path = ensure_certificate() context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.minimum_version = ssl.TLSVersion.TLSv1_3 - context.load_cert_chain(certfile=str(cert_path), keyfile=str(key_path)) + context.load_cert_chain(certfile=str(bundle_path), keyfile=str(key_path)) return context diff --git a/Data/Server/server.py b/Data/Server/server.py index 147ec24..a3a73b6 100644 --- a/Data/Server/server.py +++ b/Data/Server/server.py @@ -8323,6 +8323,6 @@ if __name__ == "__main__": app, host="0.0.0.0", port=5000, - certfile=TLS_CERT_PATH, + certfile=TLS_BUNDLE_PATH, keyfile=TLS_KEY_PATH, )