mirror of
https://github.com/bunny-lab-io/Borealis.git
synced 2025-10-26 17:21:58 -06:00
Refine pinned certificate handling for Socket.IO
This commit is contained in:
@@ -12,13 +12,18 @@ import platform
|
||||
import stat
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import ssl
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||
|
||||
try:
|
||||
from cryptography import x509 # type: ignore
|
||||
except Exception: # pragma: no cover - optional dependency guard
|
||||
x509 = None # type: ignore
|
||||
|
||||
IS_WINDOWS = platform.system().lower().startswith("win")
|
||||
|
||||
try:
|
||||
@@ -376,46 +381,37 @@ class AgentKeyStore:
|
||||
def describe_server_certificate(self) -> Tuple[int, Optional[str]]:
|
||||
"""Return (certificate_count, sha256_fingerprint_prefix)."""
|
||||
|
||||
try:
|
||||
if not os.path.isfile(self._server_certificate_path):
|
||||
return 0, None
|
||||
with open(self._server_certificate_path, "rb") as fh:
|
||||
pem_data = fh.read()
|
||||
except Exception:
|
||||
return 0, None
|
||||
count, fingerprint, _ = self.summarize_server_certificate()
|
||||
return count, fingerprint
|
||||
|
||||
if not pem_data:
|
||||
return 0, None
|
||||
def summarize_server_certificate(self) -> Tuple[int, Optional[str], bool]:
|
||||
"""Return (certificate_count, fingerprint_prefix, layered_default_trust)."""
|
||||
|
||||
try:
|
||||
from cryptography import x509 # type: ignore
|
||||
except Exception:
|
||||
return 0, None
|
||||
pem_bytes, certs = self._load_server_certificates()
|
||||
if not pem_bytes:
|
||||
return 0, None, False
|
||||
|
||||
certs = []
|
||||
for chunk in pem_data.split(b"-----END CERTIFICATE-----"):
|
||||
if b"-----BEGIN CERTIFICATE-----" not in chunk:
|
||||
continue
|
||||
block = chunk + b"-----END CERTIFICATE-----\n"
|
||||
fingerprint = None
|
||||
if certs:
|
||||
try:
|
||||
cert = x509.load_pem_x509_certificate(block)
|
||||
first_cert = certs[0]
|
||||
fingerprint = hashlib.sha256(
|
||||
first_cert.public_bytes(serialization.Encoding.DER)
|
||||
).hexdigest()
|
||||
except Exception:
|
||||
continue
|
||||
certs.append(cert)
|
||||
|
||||
if not certs:
|
||||
return 0, None
|
||||
|
||||
try:
|
||||
first_cert = certs[0]
|
||||
fingerprint = hashlib.sha256(
|
||||
first_cert.public_bytes(serialization.Encoding.DER)
|
||||
).hexdigest()
|
||||
except Exception:
|
||||
fingerprint = None
|
||||
fingerprint = None
|
||||
else:
|
||||
try:
|
||||
pem_text = pem_bytes.decode("utf-8")
|
||||
der_bytes = ssl.PEM_cert_to_DER_cert(pem_text)
|
||||
fingerprint = hashlib.sha256(der_bytes).hexdigest()
|
||||
except Exception:
|
||||
fingerprint = None
|
||||
|
||||
count = len(certs) if certs else 1
|
||||
prefix = fingerprint[:12] if fingerprint else None
|
||||
return len(certs), prefix
|
||||
include_default = self._should_layer_default_trust(certs)
|
||||
return count, prefix, include_default
|
||||
|
||||
def save_server_certificate(self, pem_text: str) -> None:
|
||||
if not pem_text:
|
||||
@@ -439,14 +435,15 @@ class AgentKeyStore:
|
||||
return None
|
||||
|
||||
def build_ssl_context(self) -> Optional[ssl.SSLContext]:
|
||||
if not os.path.isfile(self._server_certificate_path):
|
||||
pem_bytes, certs = self._load_server_certificates()
|
||||
if not pem_bytes:
|
||||
return None
|
||||
|
||||
try:
|
||||
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
except Exception:
|
||||
try:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@@ -466,26 +463,42 @@ class AgentKeyStore:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
loaded = False
|
||||
pem_text = None
|
||||
try:
|
||||
context.load_verify_locations(cafile=self._server_certificate_path)
|
||||
loaded = True
|
||||
pem_text = pem_bytes.decode("utf-8")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
loaded = False
|
||||
if pem_text:
|
||||
try:
|
||||
context.load_verify_locations(cadata=pem_text)
|
||||
loaded = True
|
||||
except Exception:
|
||||
loaded = False
|
||||
|
||||
if not loaded:
|
||||
try:
|
||||
with open(self._server_certificate_path, "r", encoding="utf-8") as fh:
|
||||
pem_text = fh.read()
|
||||
if pem_text:
|
||||
context.load_verify_locations(cadata=pem_text)
|
||||
loaded = True
|
||||
context.load_verify_locations(cafile=self._server_certificate_path)
|
||||
loaded = True
|
||||
except Exception:
|
||||
loaded = False
|
||||
|
||||
if not loaded:
|
||||
return None
|
||||
|
||||
include_default = self._should_layer_default_trust(certs)
|
||||
try:
|
||||
setattr(context, "_borealis_layered_default", include_default)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if include_default:
|
||||
try:
|
||||
context.load_default_certs()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
verify_flag = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", None)
|
||||
if verify_flag is not None:
|
||||
try:
|
||||
@@ -493,13 +506,61 @@ class AgentKeyStore:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
context.load_default_certs()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return context
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Server certificate helpers (internal)
|
||||
# ------------------------------------------------------------------
|
||||
def _load_server_certificates(self) -> Tuple[Optional[bytes], List["x509.Certificate"]]:
|
||||
try:
|
||||
if not os.path.isfile(self._server_certificate_path):
|
||||
return None, []
|
||||
with open(self._server_certificate_path, "rb") as fh:
|
||||
pem_bytes = fh.read()
|
||||
except Exception:
|
||||
return None, []
|
||||
|
||||
if not pem_bytes.strip():
|
||||
return None, []
|
||||
|
||||
if x509 is None:
|
||||
return pem_bytes, []
|
||||
|
||||
terminator = b"-----END CERTIFICATE-----"
|
||||
certs: List["x509.Certificate"] = []
|
||||
for chunk in pem_bytes.split(terminator):
|
||||
if b"-----BEGIN CERTIFICATE-----" not in chunk:
|
||||
continue
|
||||
block = chunk + terminator + b"\n"
|
||||
try:
|
||||
cert = x509.load_pem_x509_certificate(block)
|
||||
except Exception:
|
||||
continue
|
||||
certs.append(cert)
|
||||
|
||||
return pem_bytes, certs
|
||||
|
||||
def _should_layer_default_trust(self, certs: List["x509.Certificate"]) -> bool:
|
||||
if not certs:
|
||||
return True
|
||||
|
||||
try:
|
||||
first_cert = certs[0]
|
||||
is_self_issued = first_cert.issuer == first_cert.subject
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
if not is_self_issued:
|
||||
return True
|
||||
|
||||
try:
|
||||
basic = first_cert.extensions.get_extension_for_class(x509.BasicConstraints) # type: ignore[attr-defined]
|
||||
is_ca = bool(basic.value.ca)
|
||||
except Exception:
|
||||
is_ca = False
|
||||
|
||||
return is_ca
|
||||
|
||||
def save_server_signing_key(self, value: str) -> None:
|
||||
if not value:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user