diff --git a/src/tether/admin/_client.py b/src/tether/admin/_client.py index daf9e4c..9622662 100644 --- a/src/tether/admin/_client.py +++ b/src/tether/admin/_client.py @@ -7,6 +7,22 @@ DEFAULT_LICENSE_ENDPOINT = "https://tether-licenses.fastcrest.workers.dev" +_LOCAL_HOSTS = ("localhost", "127.0.0.1", "::1", "0.0.0.0") + + +class AdminError(Exception): + """An admin command failed (auth / network / HTTP / bad input). + + Carries the process exit code so a command's ``main()`` can ``return + exc.exit_code`` instead of the library calling ``sys.exit`` directly. + Raising rather than exiting is what makes the admin commands unit-testable + and reusable as a library. + """ + + def __init__(self, message: str, *, exit_code: int = 2): + super().__init__(message) + self.exit_code = exit_code + def get_endpoint() -> str: return os.environ.get("TETHER_LICENSE_ENDPOINT", DEFAULT_LICENSE_ENDPOINT).rstrip("/") @@ -15,58 +31,82 @@ def get_endpoint() -> str: def get_admin_token() -> str: token = os.environ.get("TETHER_ADMIN_TOKEN", "").strip() if not token: - sys.stderr.write( - "ERROR: TETHER_ADMIN_TOKEN env var is not set.\n" + raise AdminError( + "TETHER_ADMIN_TOKEN env var is not set.\n" " Set it to the bearer token you configured on the worker via\n" - " `wrangler secret put ADMIN_TOKEN`.\n" + " `wrangler secret put ADMIN_TOKEN`." ) - sys.exit(2) return token +def _warn_insecure_endpoint(url: str) -> None: + """Warn if the admin bearer token would go out over cleartext HTTP. + + Non-HTTPS to a non-local host sends the admin secret in the clear; surface + it loudly rather than silently leaking the token. + """ + if url.startswith("https://"): + return + host = url.split("://", 1)[-1].split("/", 1)[0].split(":", 1)[0] + if host in _LOCAL_HOSTS: + return + sys.stderr.write( + f"WARNING: sending the admin token over a non-HTTPS endpoint ({url}).\n" + " The bearer token is transmitted in cleartext. Use an https:// " + "TETHER_LICENSE_ENDPOINT for anything but local testing.\n" + ) + + def admin_request(method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: """Make an authenticated admin request to the license worker. - Exits the process with code 2 on auth/network/HTTP errors so admin scripts - fail loudly. + Raises ``AdminError`` (exit_code 2) on auth / network / HTTP / unsupported- + method errors so admin commands fail loudly while staying testable. """ + method = method.upper() + if method not in ("GET", "POST"): + # Raise BEFORE the network try-block so a programmer error isn't + # misreported as "could not reach worker". + raise AdminError(f"Unsupported HTTP method: {method!r}") + try: import httpx - except ImportError: - sys.stderr.write("ERROR: httpx not installed. Run: pip install httpx\n") - sys.exit(2) + except ImportError as exc: + raise AdminError("httpx not installed. Run: pip install httpx") from exc url = f"{get_endpoint()}{path}" + _warn_insecure_endpoint(url) headers = {"Authorization": f"Bearer {get_admin_token()}"} try: if method == "GET": resp = httpx.get(url, headers=headers, timeout=15.0) - elif method == "POST": + else: # POST resp = httpx.post(url, headers=headers, json=body or {}, timeout=15.0) - else: - raise ValueError(f"Unsupported method: {method}") - except Exception as exc: # noqa: BLE001 - sys.stderr.write(f"ERROR: Could not reach worker at {url}: {exc}\n") - sys.exit(2) + except httpx.HTTPError as exc: + raise AdminError(f"Could not reach worker at {url}: {exc}") from exc if resp.status_code == 401: - sys.stderr.write( - "ERROR: 401 Unauthorized. Check that TETHER_ADMIN_TOKEN matches\n" - " the value set via `wrangler secret put ADMIN_TOKEN`.\n" + raise AdminError( + "401 Unauthorized. Check that TETHER_ADMIN_TOKEN matches\n" + " the value set via `wrangler secret put ADMIN_TOKEN`." ) - sys.exit(2) try: body_json = resp.json() - except Exception: # noqa: BLE001 + except Exception: # noqa: BLE001 — worker returned non-JSON body_json = {"raw": resp.text} if resp.status_code >= 400: - sys.stderr.write(f"ERROR: Worker returned HTTP {resp.status_code}: {body_json}\n") - sys.exit(2) + raise AdminError(f"Worker returned HTTP {resp.status_code}: {body_json}") return body_json -__all__ = ["DEFAULT_LICENSE_ENDPOINT", "admin_request", "get_admin_token", "get_endpoint"] +__all__ = [ + "DEFAULT_LICENSE_ENDPOINT", + "AdminError", + "admin_request", + "get_admin_token", + "get_endpoint", +] diff --git a/src/tether/admin/issue_license.py b/src/tether/admin/issue_license.py index b174cbe..3efe525 100644 --- a/src/tether/admin/issue_license.py +++ b/src/tether/admin/issue_license.py @@ -16,7 +16,7 @@ import argparse import sys -from tether.admin._client import admin_request +from tether.admin._client import AdminError, admin_request VALID_TIERS = ["trial", "pro", "team", "enterprise", "educational", "research", "oss"] @@ -35,6 +35,9 @@ def main(argv: list[str] | None = None) -> int: if args.expires_in < 1: sys.stderr.write("ERROR: --expires-in must be >= 1\n") return 2 + if args.max_seats < 1: + sys.stderr.write("ERROR: --max-seats must be >= 1\n") + return 2 body = { "customer_id": args.customer_id, @@ -44,13 +47,17 @@ def main(argv: list[str] | None = None) -> int: "notes": args.notes, } - resp = admin_request("POST", "/admin/issue", body) + try: + resp = admin_request("POST", "/admin/issue", body) + except AdminError as exc: + sys.stderr.write(f"ERROR: {exc}\n") + return exc.exit_code license_id = resp.get("license_id", "?") code = resp.get("activation_code", "?") code_expires = resp.get("activation_expires_at", "?") - license = resp.get("license", {}) - expires = license.get("expires_at", "?") + lic = resp.get("license", {}) + expires = lic.get("expires_at", "?") print() print(f" License issued for {args.customer_id}") diff --git a/src/tether/admin/list_licenses.py b/src/tether/admin/list_licenses.py index 4d76a5a..84db052 100644 --- a/src/tether/admin/list_licenses.py +++ b/src/tether/admin/list_licenses.py @@ -10,7 +10,7 @@ import json import sys -from tether.admin._client import admin_request +from tether.admin._client import AdminError, admin_request def main(argv: list[str] | None = None) -> int: @@ -19,7 +19,15 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument("--json", action="store_true", help="Output raw JSON (for scripts).") args = parser.parse_args(argv) - resp = admin_request("GET", f"/admin/list?limit={args.limit}") + if args.limit < 1: + sys.stderr.write("ERROR: --limit must be >= 1\n") + return 2 + + try: + resp = admin_request("GET", f"/admin/list?limit={args.limit}") + except AdminError as exc: + sys.stderr.write(f"ERROR: {exc}\n") + return exc.exit_code licenses = resp.get("licenses", []) if args.json: diff --git a/src/tether/admin/revoke_license.py b/src/tether/admin/revoke_license.py index e715411..3ac5b3e 100644 --- a/src/tether/admin/revoke_license.py +++ b/src/tether/admin/revoke_license.py @@ -12,7 +12,7 @@ import argparse import sys -from tether.admin._client import admin_request +from tether.admin._client import AdminError, admin_request def main(argv: list[str] | None = None) -> int: @@ -21,10 +21,14 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument("--reason", default="admin_revoke", help="Reason for revocation (admin audit).") args = parser.parse_args(argv) - resp = admin_request("POST", "/admin/revoke", { - "license_id": args.license_id, - "reason": args.reason, - }) + try: + resp = admin_request("POST", "/admin/revoke", { + "license_id": args.license_id, + "reason": args.reason, + }) + except AdminError as exc: + sys.stderr.write(f"ERROR: {exc}\n") + return exc.exit_code print() print(f" License revoked: {resp.get('license_id')}") diff --git a/tests/test_admin.py b/tests/test_admin.py new file mode 100644 index 0000000..e1aa1b3 --- /dev/null +++ b/tests/test_admin.py @@ -0,0 +1,138 @@ +"""Tests for the admin CLI (license issuance / listing / revocation). + +These exist because the prior `sys.exit`-from-library design made admin/ +untestable. The refactor raises AdminError instead, so we can assert on +behavior here without spawning subprocesses. +""" +from __future__ import annotations + +import httpx +import pytest + +from tether.admin import issue_license, list_licenses, revoke_license +from tether.admin._client import AdminError, admin_request, get_admin_token + + +class _FakeResp: + def __init__(self, status_code: int, payload=None, text: str = ""): + self.status_code = status_code + self._payload = payload + self.text = text + + def json(self): + if self._payload is None: + raise ValueError("no json") + return self._payload + + +# ---- _client.get_admin_token -------------------------------------------- + +def test_get_admin_token_missing_raises(monkeypatch): + monkeypatch.delenv("TETHER_ADMIN_TOKEN", raising=False) + with pytest.raises(AdminError) as ei: + get_admin_token() + assert ei.value.exit_code == 2 + assert "TETHER_ADMIN_TOKEN" in str(ei.value) + + +def test_get_admin_token_present(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", " secret ") + assert get_admin_token() == "secret" + + +# ---- _client.admin_request ---------------------------------------------- + +def test_unsupported_method_is_not_a_network_error(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + with pytest.raises(AdminError) as ei: + admin_request("DELETE", "/admin/whatever") + # The bug was this being reported as "could not reach worker". + assert "Unsupported HTTP method" in str(ei.value) + assert "reach worker" not in str(ei.value) + + +def test_success_returns_json(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {"licenses": []})) + assert admin_request("GET", "/admin/list") == {"licenses": []} + + +def test_401_raises_auth_error(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + monkeypatch.setattr(httpx, "post", lambda *a, **k: _FakeResp(401)) + with pytest.raises(AdminError) as ei: + admin_request("POST", "/admin/issue", {}) + assert "401" in str(ei.value) + + +def test_4xx_raises(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + monkeypatch.setattr(httpx, "post", lambda *a, **k: _FakeResp(500, {"error": "boom"})) + with pytest.raises(AdminError): + admin_request("POST", "/admin/issue", {}) + + +def test_network_error_wrapped(monkeypatch): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + + def _boom(*a, **k): + raise httpx.ConnectError("refused") + + monkeypatch.setattr(httpx, "get", _boom) + with pytest.raises(AdminError) as ei: + admin_request("GET", "/admin/list") + assert "reach worker" in str(ei.value) + + +def test_insecure_endpoint_warns(monkeypatch, capsys): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + monkeypatch.setenv("TETHER_LICENSE_ENDPOINT", "http://license.example.com") + monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {})) + admin_request("GET", "/admin/list") + assert "non-HTTPS" in capsys.readouterr().err + + +def test_localhost_http_does_not_warn(monkeypatch, capsys): + monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t") + monkeypatch.setenv("TETHER_LICENSE_ENDPOINT", "http://127.0.0.1:8787") + monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {})) + admin_request("GET", "/admin/list") + assert "non-HTTPS" not in capsys.readouterr().err + + +# ---- command main() exit codes ------------------------------------------ + +def test_issue_license_admin_error_returns_code(monkeypatch): + def _raise(*a, **k): + raise AdminError("nope", exit_code=2) + + monkeypatch.setattr(issue_license, "admin_request", _raise) + rc = issue_license.main(["--customer-id", "a@b.com"]) + assert rc == 2 + + +def test_issue_license_success(monkeypatch, capsys): + monkeypatch.setattr( + issue_license, "admin_request", + lambda *a, **k: {"license_id": "lic_1", "activation_code": "REFLEX-AAAA", + "license": {"expires_at": "2026-12-01"}}, + ) + rc = issue_license.main(["--customer-id", "a@b.com", "--tier", "pro"]) + assert rc == 0 + assert "lic_1" in capsys.readouterr().out + + +def test_issue_license_bad_max_seats(monkeypatch): + assert issue_license.main(["--customer-id", "a@b.com", "--max-seats", "0"]) == 2 + + +def test_list_licenses_bad_limit(): + assert list_licenses.main(["--limit", "0"]) == 2 + + +def test_revoke_license_admin_error_returns_code(monkeypatch): + def _raise(*a, **k): + raise AdminError("gone", exit_code=2) + + monkeypatch.setattr(revoke_license, "admin_request", _raise) + assert revoke_license.main(["--license-id", "lic_x"]) == 2