Removed Legacy Server Codebase

This commit is contained in:
2025-11-01 03:58:43 -06:00
parent bec43418c1
commit da37098d91
106 changed files with 6 additions and 36891 deletions

View File

@@ -1 +0,0 @@

View File

@@ -1,372 +0,0 @@
"""
Server TLS certificate management.
Borealis now issues a dedicated root CA and a leaf server certificate so that
agents can pin the CA without requiring a re-enrollment every time the server
certificate is refreshed. The CA is persisted alongside the server key so that
existing deployments can be upgraded in-place.
"""
from __future__ import annotations
import os
import ssl
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional, Tuple
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
from Modules.runtime import ensure_server_certificates_dir, runtime_path, server_certificates_path
_CERT_DIR = server_certificates_path()
_CERT_FILE = _CERT_DIR / "borealis-server-cert.pem"
_KEY_FILE = _CERT_DIR / "borealis-server-key.pem"
_BUNDLE_FILE = _CERT_DIR / "borealis-server-bundle.pem"
_CA_KEY_FILE = _CERT_DIR / "borealis-root-ca-key.pem"
_CA_CERT_FILE = _CERT_DIR / "borealis-root-ca.pem"
_LEGACY_CERT_DIR = runtime_path("certs")
_LEGACY_CERT_FILE = _LEGACY_CERT_DIR / "borealis-server-cert.pem"
_LEGACY_KEY_FILE = _LEGACY_CERT_DIR / "borealis-server-key.pem"
_LEGACY_BUNDLE_FILE = _LEGACY_CERT_DIR / "borealis-server-bundle.pem"
_ROOT_COMMON_NAME = "Borealis Root CA"
_ORG_NAME = "Borealis"
_ROOT_VALIDITY = timedelta(days=365 * 100)
_SERVER_VALIDITY = timedelta(days=365 * 5)
def ensure_certificate(common_name: str = "Borealis Server") -> Tuple[Path, Path, Path]:
"""
Ensure the root CA, server certificate, and bundle exist on disk.
Returns (cert_path, key_path, bundle_path).
"""
ensure_server_certificates_dir()
_migrate_legacy_material_if_present()
ca_key, ca_cert, ca_regenerated = _ensure_root_ca()
server_cert = _load_certificate(_CERT_FILE)
needs_regen = ca_regenerated or _server_certificate_needs_regeneration(server_cert, ca_cert)
if needs_regen:
server_cert = _generate_server_certificate(common_name, ca_key, ca_cert)
if server_cert is None:
server_cert = _generate_server_certificate(common_name, ca_key, ca_cert)
_write_bundle(server_cert, ca_cert)
return _CERT_FILE, _KEY_FILE, _BUNDLE_FILE
def _migrate_legacy_material_if_present() -> None:
# Promote legacy runtime certificates (Server/Borealis/certs) into the new location.
if not _CERT_FILE.exists() or not _KEY_FILE.exists():
legacy_cert = _LEGACY_CERT_FILE
legacy_key = _LEGACY_KEY_FILE
if legacy_cert.exists() and legacy_key.exists():
try:
ensure_server_certificates_dir()
if not _CERT_FILE.exists():
_safe_copy(legacy_cert, _CERT_FILE)
if not _KEY_FILE.exists():
_safe_copy(legacy_key, _KEY_FILE)
except Exception:
pass
def _ensure_root_ca() -> Tuple[ec.EllipticCurvePrivateKey, x509.Certificate, bool]:
regenerated = False
ca_key: Optional[ec.EllipticCurvePrivateKey] = None
ca_cert: Optional[x509.Certificate] = None
if _CA_KEY_FILE.exists() and _CA_CERT_FILE.exists():
try:
ca_key = _load_private_key(_CA_KEY_FILE)
ca_cert = _load_certificate(_CA_CERT_FILE)
if ca_cert is not None and ca_key is not None:
expiry = _cert_not_after(ca_cert)
subject = ca_cert.subject
subject_cn = ""
try:
subject_cn = subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value # type: ignore[index]
except Exception:
subject_cn = ""
try:
basic = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints).value # type: ignore[attr-defined]
is_ca = bool(basic.ca)
except Exception:
is_ca = False
if (
expiry <= datetime.now(tz=timezone.utc)
or not is_ca
or subject_cn != _ROOT_COMMON_NAME
):
regenerated = True
else:
regenerated = True
except Exception:
regenerated = True
else:
regenerated = True
if regenerated or ca_key is None or ca_cert is None:
ca_key = ec.generate_private_key(ec.SECP384R1())
public_key = ca_key.public_key()
now = datetime.now(tz=timezone.utc)
builder = (
x509.CertificateBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, _ROOT_COMMON_NAME),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME),
]
)
)
.issuer_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, _ROOT_COMMON_NAME),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME),
]
)
)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(now - timedelta(minutes=5))
.not_valid_after(now + _ROOT_VALIDITY)
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key),
critical=False,
)
)
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key),
critical=False,
)
ca_cert = builder.sign(private_key=ca_key, algorithm=hashes.SHA384())
_CA_KEY_FILE.write_bytes(
ca_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
_CA_CERT_FILE.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
_tighten_permissions(_CA_KEY_FILE)
_tighten_permissions(_CA_CERT_FILE)
else:
regenerated = False
return ca_key, ca_cert, regenerated
def _server_certificate_needs_regeneration(
server_cert: Optional[x509.Certificate],
ca_cert: x509.Certificate,
) -> bool:
if server_cert is None:
return True
try:
if server_cert.issuer != ca_cert.subject:
return True
except Exception:
return True
try:
expiry = _cert_not_after(server_cert)
if expiry <= datetime.now(tz=timezone.utc):
return True
except Exception:
return True
try:
basic = server_cert.extensions.get_extension_for_class(x509.BasicConstraints).value # type: ignore[attr-defined]
if basic.ca:
return True
except Exception:
return True
try:
eku = server_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value # type: ignore[attr-defined]
if ExtendedKeyUsageOID.SERVER_AUTH not in eku:
return True
except Exception:
return True
return False
def _generate_server_certificate(
common_name: str,
ca_key: ec.EllipticCurvePrivateKey,
ca_cert: x509.Certificate,
) -> x509.Certificate:
private_key = ec.generate_private_key(ec.SECP384R1())
public_key = private_key.public_key()
now = datetime.now(tz=timezone.utc)
ca_expiry = _cert_not_after(ca_cert)
candidate_expiry = now + _SERVER_VALIDITY
not_after = min(ca_expiry - timedelta(days=1), candidate_expiry)
builder = (
x509.CertificateBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, _ORG_NAME),
]
)
)
.issuer_name(ca_cert.subject)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(now - timedelta(minutes=5))
.not_valid_after(not_after)
.add_extension(
x509.SubjectAlternativeName(
[
x509.DNSName("localhost"),
x509.DNSName("127.0.0.1"),
x509.DNSName("::1"),
]
),
critical=False,
)
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),
critical=False,
)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key),
critical=False,
)
.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_key.public_key()),
critical=False,
)
)
certificate = builder.sign(private_key=ca_key, algorithm=hashes.SHA384())
_KEY_FILE.write_bytes(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
_CERT_FILE.write_bytes(certificate.public_bytes(serialization.Encoding.PEM))
_tighten_permissions(_KEY_FILE)
_tighten_permissions(_CERT_FILE)
return certificate
def _write_bundle(server_cert: x509.Certificate, ca_cert: x509.Certificate) -> None:
try:
server_pem = server_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
ca_pem = ca_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
except Exception:
return
bundle = f"{server_pem}\n{ca_pem}\n"
_BUNDLE_FILE.write_text(bundle, encoding="utf-8")
_tighten_permissions(_BUNDLE_FILE)
def _safe_copy(src: Path, dst: Path) -> None:
try:
dst.write_bytes(src.read_bytes())
except Exception:
pass
def _tighten_permissions(path: Path) -> None:
try:
if os.name == "posix":
path.chmod(0o600)
except Exception:
pass
def _load_private_key(path: Path) -> ec.EllipticCurvePrivateKey:
with path.open("rb") as fh:
return serialization.load_pem_private_key(fh.read(), password=None)
def _load_certificate(path: Path) -> Optional[x509.Certificate]:
try:
return x509.load_pem_x509_certificate(path.read_bytes())
except Exception:
return None
def _cert_not_after(cert: x509.Certificate) -> datetime:
try:
return cert.not_valid_after_utc # type: ignore[attr-defined]
except AttributeError:
value = cert.not_valid_after
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value
def build_ssl_context() -> ssl.SSLContext:
cert_path, key_path, bundle_path = ensure_certificate()
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.load_cert_chain(certfile=str(bundle_path), keyfile=str(key_path))
return context
def certificate_paths() -> Tuple[str, str, str]:
cert_path, key_path, bundle_path = ensure_certificate()
return str(cert_path), str(key_path), str(bundle_path)

View File

@@ -1,71 +0,0 @@
"""
Utility helpers for working with Ed25519 keys and fingerprints.
"""
from __future__ import annotations
import base64
import hashlib
import re
from typing import Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives.asymmetric import ed25519
def generate_ed25519_keypair() -> Tuple[ed25519.Ed25519PrivateKey, bytes]:
"""
Generate a new Ed25519 keypair.
Returns the private key object and the public key encoded as SubjectPublicKeyInfo DER bytes.
"""
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
return private_key, public_key
def normalize_base64(data: str) -> str:
"""
Collapse whitespace and normalise URL-safe encodings so we can reliably decode.
"""
cleaned = re.sub(r"\\s+", "", data or "")
return cleaned.replace("-", "+").replace("_", "/")
def spki_der_from_base64(spki_b64: str) -> bytes:
return base64.b64decode(normalize_base64(spki_b64), validate=True)
def base64_from_spki_der(spki_der: bytes) -> str:
return base64.b64encode(spki_der).decode("ascii")
def fingerprint_from_spki_der(spki_der: bytes) -> str:
digest = hashlib.sha256(spki_der).hexdigest()
return digest.lower()
def fingerprint_from_base64_spki(spki_b64: str) -> str:
return fingerprint_from_spki_der(spki_der_from_base64(spki_b64))
def private_key_to_pem(private_key: ed25519.Ed25519PrivateKey) -> bytes:
return private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
def public_key_to_pem(public_spki_der: bytes) -> bytes:
public_key = load_der_public_key(public_spki_der)
return public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)

View File

@@ -1,125 +0,0 @@
"""
Code-signing helpers for delivering scripts to agents.
"""
from __future__ import annotations
from pathlib import Path
from typing import Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from Modules.runtime import (
ensure_server_certificates_dir,
server_certificates_path,
runtime_path,
)
from .keys import base64_from_spki_der
_KEY_DIR = server_certificates_path("Code-Signing")
_SIGNING_KEY_FILE = _KEY_DIR / "borealis-script-ed25519.key"
_SIGNING_PUB_FILE = _KEY_DIR / "borealis-script-ed25519.pub"
_LEGACY_KEY_FILE = runtime_path("keys") / "borealis-script-ed25519.key"
_LEGACY_PUB_FILE = runtime_path("keys") / "borealis-script-ed25519.pub"
_OLD_RUNTIME_KEY_DIR = runtime_path("script_signing_keys")
_OLD_RUNTIME_KEY_FILE = _OLD_RUNTIME_KEY_DIR / "borealis-script-ed25519.key"
_OLD_RUNTIME_PUB_FILE = _OLD_RUNTIME_KEY_DIR / "borealis-script-ed25519.pub"
class ScriptSigner:
def __init__(self, private_key: ed25519.Ed25519PrivateKey):
self._private = private_key
self._public = private_key.public_key()
def sign(self, payload: bytes) -> bytes:
return self._private.sign(payload)
def public_spki_der(self) -> bytes:
return self._public.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
def public_base64_spki(self) -> str:
return base64_from_spki_der(self.public_spki_der())
def load_signer() -> ScriptSigner:
private_key = _load_or_create()
return ScriptSigner(private_key)
def _load_or_create() -> ed25519.Ed25519PrivateKey:
ensure_server_certificates_dir("Code-Signing")
_migrate_legacy_material_if_present()
if _SIGNING_KEY_FILE.exists():
with _SIGNING_KEY_FILE.open("rb") as fh:
return serialization.load_pem_private_key(fh.read(), password=None)
if _LEGACY_KEY_FILE.exists():
with _LEGACY_KEY_FILE.open("rb") as fh:
return serialization.load_pem_private_key(fh.read(), password=None)
private_key = ed25519.Ed25519PrivateKey.generate()
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
with _SIGNING_KEY_FILE.open("wb") as fh:
fh.write(pem)
try:
if hasattr(_SIGNING_KEY_FILE, "chmod"):
_SIGNING_KEY_FILE.chmod(0o600)
except Exception:
pass
pub_der = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
_SIGNING_PUB_FILE.write_bytes(pub_der)
return private_key
def _migrate_legacy_material_if_present() -> None:
if _SIGNING_KEY_FILE.exists():
return
# First migrate from legacy runtime path embedded in Server runtime.
try:
if _OLD_RUNTIME_KEY_FILE.exists() and not _SIGNING_KEY_FILE.exists():
ensure_server_certificates_dir("Code-Signing")
try:
_OLD_RUNTIME_KEY_FILE.replace(_SIGNING_KEY_FILE)
except Exception:
_SIGNING_KEY_FILE.write_bytes(_OLD_RUNTIME_KEY_FILE.read_bytes())
if _OLD_RUNTIME_PUB_FILE.exists() and not _SIGNING_PUB_FILE.exists():
try:
_OLD_RUNTIME_PUB_FILE.replace(_SIGNING_PUB_FILE)
except Exception:
_SIGNING_PUB_FILE.write_bytes(_OLD_RUNTIME_PUB_FILE.read_bytes())
except Exception:
pass
if not _LEGACY_KEY_FILE.exists() or _SIGNING_KEY_FILE.exists():
return
try:
ensure_server_certificates_dir("Code-Signing")
try:
_LEGACY_KEY_FILE.replace(_SIGNING_KEY_FILE)
except Exception:
_SIGNING_KEY_FILE.write_bytes(_LEGACY_KEY_FILE.read_bytes())
if _LEGACY_PUB_FILE.exists() and not _SIGNING_PUB_FILE.exists():
try:
_LEGACY_PUB_FILE.replace(_SIGNING_PUB_FILE)
except Exception:
_SIGNING_PUB_FILE.write_bytes(_LEGACY_PUB_FILE.read_bytes())
except Exception:
return