diff --git a/Data/Agent/agent.py b/Data/Agent/agent.py index 140ae09..f4f4ab6 100644 --- a/Data/Agent/agent.py +++ b/Data/Agent/agent.py @@ -929,25 +929,29 @@ class AgentHttpClient: pass context = None + bundle_summary = {"count": None, "fingerprint": None, "layered_default": None} if isinstance(verify, str) and os.path.isfile(verify): - try: - # Mirror Requests' certificate handling by starting from a - # default client context (which pre-loads the system - # certificate stores) and then layering the pinned - # certificate bundle on top. This matches the REST client - # behaviour and ensures self-signed leaf certificates work - # the same way for Socket.IO handshakes. - context = ssl.create_default_context() - context.check_hostname = False - context.load_verify_locations(cafile=verify) + bundle_count, bundle_fp, layered_default = self.key_store.summarize_server_certificate() + bundle_summary = { + "count": bundle_count, + "fingerprint": bundle_fp, + "layered_default": layered_default, + } + context = self.key_store.build_ssl_context() + if context is not None: + if bundle_summary["layered_default"] is None: + bundle_summary["layered_default"] = getattr( + context, "_borealis_layered_default", None + ) _log_agent( - f"SocketIO TLS alignment created SSLContext from cafile={verify}", + "SocketIO TLS alignment created SSLContext from pinned bundle " + f"count={bundle_count} fp={bundle_fp or ''} " + f"layered_default={bundle_summary['layered_default']}", fname="agent.log", ) - except Exception: - context = None + else: _log_agent( - f"SocketIO TLS alignment failed to build context from cafile={verify}", + "SocketIO TLS alignment failed to build context from pinned bundle", # noqa: E501 fname="agent.error.log", ) @@ -960,7 +964,10 @@ class AgentHttpClient: _set_attr(http_iface, "verify_ssl", True) _reset_cached_session() _log_agent( - "SocketIO TLS alignment applied dedicated SSLContext to engine/http", + "SocketIO TLS alignment applied dedicated SSLContext to engine/http " + f"count={bundle_summary['count']} " + f"fp={bundle_summary['fingerprint'] or ''} " + f"layered_default={bundle_summary['layered_default']}", fname="agent.log", ) return diff --git a/Data/Agent/security.py b/Data/Agent/security.py index 30b935d..2db54c7 100644 --- a/Data/Agent/security.py +++ b/Data/Agent/security.py @@ -12,11 +12,18 @@ import platform import stat import time from dataclasses import dataclass -from typing import Optional +from typing import List, Optional, Tuple + +import ssl from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 +try: + from cryptography import x509 # type: ignore +except Exception: # pragma: no cover - optional dependency guard + x509 = None # type: ignore + IS_WINDOWS = platform.system().lower().startswith("win") try: @@ -371,6 +378,41 @@ class AgentKeyStore: def server_certificate_path(self) -> str: return self._server_certificate_path + def describe_server_certificate(self) -> Tuple[int, Optional[str]]: + """Return (certificate_count, sha256_fingerprint_prefix).""" + + count, fingerprint, _ = self.summarize_server_certificate() + return count, fingerprint + + def summarize_server_certificate(self) -> Tuple[int, Optional[str], bool]: + """Return (certificate_count, fingerprint_prefix, layered_default_trust).""" + + pem_bytes, certs = self._load_server_certificates() + if not pem_bytes: + return 0, None, False + + fingerprint = None + if certs: + try: + first_cert = certs[0] + fingerprint = hashlib.sha256( + first_cert.public_bytes(serialization.Encoding.DER) + ).hexdigest() + except Exception: + fingerprint = None + else: + try: + pem_text = pem_bytes.decode("utf-8") + der_bytes = ssl.PEM_cert_to_DER_cert(pem_text) + fingerprint = hashlib.sha256(der_bytes).hexdigest() + except Exception: + fingerprint = None + + count = len(certs) if certs else 1 + prefix = fingerprint[:12] if fingerprint else None + include_default = self._should_layer_default_trust(certs) + return count, prefix, include_default + def save_server_certificate(self, pem_text: str) -> None: if not pem_text: return @@ -392,6 +434,133 @@ class AgentKeyStore: return None return None + def build_ssl_context(self) -> Optional[ssl.SSLContext]: + pem_bytes, certs = self._load_server_certificates() + if not pem_bytes: + return None + + try: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + except Exception: + try: + context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) + except Exception: + return None + + try: + context.check_hostname = True + except Exception: + pass + + try: + context.verify_mode = ssl.CERT_REQUIRED + except Exception: + pass + + if hasattr(context, "minimum_version"): + try: + context.minimum_version = ssl.TLSVersion.TLSv1_2 + except Exception: + pass + + pem_text = None + try: + pem_text = pem_bytes.decode("utf-8") + except Exception: + pass + + loaded = False + if pem_text: + try: + context.load_verify_locations(cadata=pem_text) + loaded = True + except Exception: + loaded = False + + if not loaded: + try: + context.load_verify_locations(cafile=self._server_certificate_path) + loaded = True + except Exception: + loaded = False + + if not loaded: + return None + + include_default = self._should_layer_default_trust(certs) + try: + setattr(context, "_borealis_layered_default", include_default) + except Exception: + pass + + if include_default: + try: + context.load_default_certs() + except Exception: + pass + + verify_flag = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", None) + if verify_flag is not None: + try: + context.verify_flags |= verify_flag # type: ignore[attr-defined] + except Exception: + pass + + return context + + # ------------------------------------------------------------------ + # Server certificate helpers (internal) + # ------------------------------------------------------------------ + def _load_server_certificates(self) -> Tuple[Optional[bytes], List["x509.Certificate"]]: + try: + if not os.path.isfile(self._server_certificate_path): + return None, [] + with open(self._server_certificate_path, "rb") as fh: + pem_bytes = fh.read() + except Exception: + return None, [] + + if not pem_bytes.strip(): + return None, [] + + if x509 is None: + return pem_bytes, [] + + terminator = b"-----END CERTIFICATE-----" + certs: List["x509.Certificate"] = [] + for chunk in pem_bytes.split(terminator): + if b"-----BEGIN CERTIFICATE-----" not in chunk: + continue + block = chunk + terminator + b"\n" + try: + cert = x509.load_pem_x509_certificate(block) + except Exception: + continue + certs.append(cert) + + return pem_bytes, certs + + def _should_layer_default_trust(self, certs: List["x509.Certificate"]) -> bool: + if not certs: + return True + + try: + first_cert = certs[0] + is_self_issued = first_cert.issuer == first_cert.subject + except Exception: + return True + + if not is_self_issued: + return True + + try: + basic = first_cert.extensions.get_extension_for_class(x509.BasicConstraints) # type: ignore[attr-defined] + is_ca = bool(basic.value.ca) + except Exception: + is_ca = False + + return is_ca + def save_server_signing_key(self, value: str) -> None: if not value: return