Major Progress Towards Interactive Remote Powershell

This commit is contained in:
2025-12-06 00:27:57 -07:00
parent 52e40c3753
commit 68dd46347b
9 changed files with 1247 additions and 53 deletions

View File

@@ -0,0 +1,90 @@
# ======================================================
# Data\Engine\Unit_Tests\test_reverse_tunnel.py
# Description: Validates reverse tunnel lease API basics (allocation, token contents, and domain limit).
#
# API Endpoints (if applicable):
# - POST /api/tunnel/request
# ======================================================
from __future__ import annotations
import base64
import json
import pytest
from .conftest import EngineTestHarness
def _client_with_admin_session(harness: EngineTestHarness):
client = harness.app.test_client()
with client.session_transaction() as sess:
sess["username"] = "admin"
sess["role"] = "Admin"
return client
def _decode_token_segment(token: str) -> dict:
"""Decode the unsigned payload segment from the tunnel token."""
if not token:
return {}
segment = token.split(".")[0]
padding = "=" * (-len(segment) % 4)
raw = base64.urlsafe_b64decode(segment + padding)
try:
return json.loads(raw.decode("utf-8"))
except Exception:
return {}
@pytest.mark.parametrize("agent_id", ["test-device-agent"])
def test_tunnel_request_happy_path(engine_harness: EngineTestHarness, agent_id: str) -> None:
client = _client_with_admin_session(engine_harness)
resp = client.post(
"/api/tunnel/request",
json={"agent_id": agent_id, "protocol": "ps", "domain": "ps"},
)
assert resp.status_code == 200
payload = resp.get_json()
assert payload["agent_id"] == agent_id
assert payload["protocol"] == "ps"
assert payload["domain"] == "ps"
assert isinstance(payload["port"], int) and payload["port"] >= 30000
assert payload.get("token")
claims = _decode_token_segment(payload["token"])
assert claims.get("agent_id") == agent_id
assert claims.get("protocol") == "ps"
assert claims.get("domain") == "ps"
assert claims.get("tunnel_id") == payload["tunnel_id"]
assert claims.get("assigned_port") == payload["port"]
def test_tunnel_request_domain_limit(engine_harness: EngineTestHarness) -> None:
client = _client_with_admin_session(engine_harness)
first = client.post(
"/api/tunnel/request",
json={"agent_id": "test-device-agent", "protocol": "ps", "domain": "ps"},
)
assert first.status_code == 200
second = client.post(
"/api/tunnel/request",
json={"agent_id": "test-device-agent", "protocol": "ps", "domain": "ps"},
)
assert second.status_code == 409
data = second.get_json()
assert data.get("error") == "domain_limit"
def test_tunnel_request_includes_timeouts(engine_harness: EngineTestHarness) -> None:
client = _client_with_admin_session(engine_harness)
resp = client.post(
"/api/tunnel/request",
json={"agent_id": "test-device-agent", "protocol": "ps", "domain": "ps"},
)
assert resp.status_code == 200
payload = resp.get_json()
assert payload.get("idle_seconds") and payload["idle_seconds"] > 0
assert payload.get("grace_seconds") and payload["grace_seconds"] > 0
assert payload.get("expires_at") and int(payload["expires_at"]) > 0

View File

@@ -0,0 +1,101 @@
# ======================================================
# Data\Engine\Unit_Tests\test_reverse_tunnel_integration.py
# Description: Integration test that exercises a full reverse tunnel PowerShell round-trip
# against a running Engine + Agent (requires live services).
#
# Requirements:
# - Environment variables must be set to point at a live Engine + Agent:
# TUNNEL_TEST_HOST (e.g., https://localhost:5000)
# TUNNEL_TEST_AGENT_ID (agent_id/agent_guid for the target device)
# TUNNEL_TEST_BEARER (Authorization bearer token for an admin/operator)
# - A live Agent must be reachable and allowed to establish the reverse tunnel.
# - TLS verification can be controlled via TUNNEL_TEST_VERIFY ("false" to disable).
#
# API Endpoints (if applicable):
# - POST /api/tunnel/request
# - Socket.IO namespace /tunnel (join, ps_open, ps_send, ps_poll)
# ======================================================
from __future__ import annotations
import os
import time
import pytest
import requests
import socketio
HOST = os.environ.get("TUNNEL_TEST_HOST", "").strip()
AGENT_ID = os.environ.get("TUNNEL_TEST_AGENT_ID", "").strip()
BEARER = os.environ.get("TUNNEL_TEST_BEARER", "").strip()
VERIFY_ENV = os.environ.get("TUNNEL_TEST_VERIFY", "").strip().lower()
VERIFY = False if VERIFY_ENV in {"false", "0", "no"} else True
SKIP_MSG = (
"Live tunnel test skipped (set TUNNEL_TEST_HOST, TUNNEL_TEST_AGENT_ID, TUNNEL_TEST_BEARER to run)"
)
def _require_env() -> None:
if not HOST or not AGENT_ID or not BEARER:
pytest.skip(SKIP_MSG)
def _make_session() -> requests.Session:
sess = requests.Session()
sess.verify = VERIFY
sess.headers.update({"Authorization": f"Bearer {BEARER}"})
return sess
def test_reverse_tunnel_powershell_roundtrip() -> None:
_require_env()
sess = _make_session()
# 1) Request a tunnel lease
resp = sess.post(
f"{HOST}/api/tunnel/request",
json={"agent_id": AGENT_ID, "protocol": "ps", "domain": "ps"},
)
assert resp.status_code == 200, f"lease request failed: {resp.status_code} {resp.text}"
lease = resp.json()
tunnel_id = lease["tunnel_id"]
# 2) Connect to Socket.IO /tunnel namespace
sio = socketio.Client()
sio.connect(
HOST,
namespaces=["/tunnel"],
headers={"Authorization": f"Bearer {BEARER}"},
transports=["websocket"],
wait_timeout=10,
)
# 3) Join tunnel and open PS channel
join_resp = sio.call("join", {"tunnel_id": tunnel_id}, namespace="/tunnel", timeout=10)
assert join_resp.get("status") == "ok", f"join failed: {join_resp}"
open_resp = sio.call("ps_open", {"cols": 120, "rows": 32}, namespace="/tunnel", timeout=10)
assert not open_resp.get("error"), f"ps_open failed: {open_resp}"
# 4) Send a command
send_resp = sio.call("ps_send", {"data": 'Write-Host "Hello World"\r\n'}, namespace="/tunnel", timeout=10)
assert not send_resp.get("error"), f"ps_send failed: {send_resp}"
# 5) Poll for output
output_text = ""
deadline = time.time() + 15
while time.time() < deadline:
poll_resp = sio.call("ps_poll", {}, namespace="/tunnel", timeout=10)
if poll_resp.get("error"):
pytest.fail(f"ps_poll failed: {poll_resp}")
lines = poll_resp.get("output") or []
output_text += "".join(lines)
if "Hello World" in output_text:
break
time.sleep(0.5)
sio.disconnect()
assert "Hello World" in output_text, f"expected command output not found; got: {output_text!r}"