mirror of
https://github.com/bunny-lab-io/Borealis.git
synced 2025-10-27 15:41:57 -06:00
120 lines
3.5 KiB
Python
120 lines
3.5 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
from typing import Any, Optional, Sequence
|
|
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
|
|
_BASE64_HINTS: Sequence[str] = ("base64", "b64", "base-64")
|
|
|
|
|
|
def _decode_base64_bytes(value: Optional[str]) -> Optional[bytes]:
|
|
if not isinstance(value, str):
|
|
return None
|
|
stripped = value.strip()
|
|
if not stripped:
|
|
return b""
|
|
cleaned = "".join(stripped.split())
|
|
if not cleaned:
|
|
return b""
|
|
try:
|
|
return base64.b64decode(cleaned, validate=True)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def decode_script_bytes(script_content: Any, encoding_hint: Optional[str]) -> Optional[bytes]:
|
|
"""
|
|
Normalize a script payload into UTF-8 bytes.
|
|
|
|
Returns None only when the payload is unusable (e.g., invalid base64,
|
|
unsupported type). Empty content is represented as b"".
|
|
"""
|
|
if isinstance(script_content, (bytes, bytearray)):
|
|
return bytes(script_content)
|
|
|
|
if script_content is None:
|
|
return b""
|
|
|
|
encoding = str(encoding_hint or "").strip().lower()
|
|
if isinstance(script_content, str):
|
|
if encoding in _BASE64_HINTS:
|
|
decoded = _decode_base64_bytes(script_content)
|
|
return decoded
|
|
|
|
decoded = _decode_base64_bytes(script_content)
|
|
if decoded is not None:
|
|
return decoded
|
|
|
|
try:
|
|
return script_content.encode("utf-8")
|
|
except Exception:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def verify_and_store_script_signature(
|
|
client: Any,
|
|
script_bytes: bytes,
|
|
signature_b64: Optional[str],
|
|
signing_key_hint: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Verify a script payload against the provided signature and persist the server
|
|
signing key when verification succeeds.
|
|
"""
|
|
if not isinstance(script_bytes, (bytes, bytearray)):
|
|
return False
|
|
|
|
signature = _decode_base64_bytes(signature_b64)
|
|
if signature is None:
|
|
return False
|
|
|
|
candidates = []
|
|
if isinstance(signing_key_hint, str):
|
|
hint = signing_key_hint.strip()
|
|
if hint:
|
|
candidates.append(hint)
|
|
|
|
stored_key = None
|
|
if client is not None and hasattr(client, "load_server_signing_key"):
|
|
try:
|
|
stored_key_raw = client.load_server_signing_key()
|
|
except Exception:
|
|
stored_key_raw = None
|
|
if isinstance(stored_key_raw, str):
|
|
stored_key = stored_key_raw.strip()
|
|
if stored_key and stored_key not in candidates:
|
|
candidates.append(stored_key)
|
|
|
|
payload = bytes(script_bytes)
|
|
for key_b64 in candidates:
|
|
try:
|
|
key_der = base64.b64decode(key_b64, validate=True)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
public_key = serialization.load_der_public_key(key_der)
|
|
except Exception:
|
|
continue
|
|
if not isinstance(public_key, ed25519.Ed25519PublicKey):
|
|
continue
|
|
try:
|
|
public_key.verify(signature, payload)
|
|
except Exception:
|
|
continue
|
|
|
|
if client is not None and hasattr(client, "store_server_signing_key"):
|
|
try:
|
|
if stored_key and stored_key != key_b64:
|
|
client.store_server_signing_key(key_b64)
|
|
elif not stored_key:
|
|
client.store_server_signing_key(key_b64)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
return False
|