Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 63 additions & 23 deletions src/tether/admin/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@

DEFAULT_LICENSE_ENDPOINT = "https://tether-licenses.fastcrest.workers.dev"

_LOCAL_HOSTS = ("localhost", "127.0.0.1", "::1", "0.0.0.0")


class AdminError(Exception):
"""An admin command failed (auth / network / HTTP / bad input).

Carries the process exit code so a command's ``main()`` can ``return
exc.exit_code`` instead of the library calling ``sys.exit`` directly.
Raising rather than exiting is what makes the admin commands unit-testable
and reusable as a library.
"""

def __init__(self, message: str, *, exit_code: int = 2):
super().__init__(message)
self.exit_code = exit_code


def get_endpoint() -> str:
return os.environ.get("TETHER_LICENSE_ENDPOINT", DEFAULT_LICENSE_ENDPOINT).rstrip("/")
Expand All @@ -15,58 +31,82 @@ def get_endpoint() -> str:
def get_admin_token() -> str:
token = os.environ.get("TETHER_ADMIN_TOKEN", "").strip()
if not token:
sys.stderr.write(
"ERROR: TETHER_ADMIN_TOKEN env var is not set.\n"
raise AdminError(
"TETHER_ADMIN_TOKEN env var is not set.\n"
" Set it to the bearer token you configured on the worker via\n"
" `wrangler secret put ADMIN_TOKEN`.\n"
" `wrangler secret put ADMIN_TOKEN`."
)
sys.exit(2)
return token


def _warn_insecure_endpoint(url: str) -> None:
"""Warn if the admin bearer token would go out over cleartext HTTP.

Non-HTTPS to a non-local host sends the admin secret in the clear; surface
it loudly rather than silently leaking the token.
"""
if url.startswith("https://"):
return
host = url.split("://", 1)[-1].split("/", 1)[0].split(":", 1)[0]
if host in _LOCAL_HOSTS:
return
sys.stderr.write(
f"WARNING: sending the admin token over a non-HTTPS endpoint ({url}).\n"
" The bearer token is transmitted in cleartext. Use an https:// "
"TETHER_LICENSE_ENDPOINT for anything but local testing.\n"
)


def admin_request(method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]:
"""Make an authenticated admin request to the license worker.

Exits the process with code 2 on auth/network/HTTP errors so admin scripts
fail loudly.
Raises ``AdminError`` (exit_code 2) on auth / network / HTTP / unsupported-
method errors so admin commands fail loudly while staying testable.
"""
method = method.upper()
if method not in ("GET", "POST"):
# Raise BEFORE the network try-block so a programmer error isn't
# misreported as "could not reach worker".
raise AdminError(f"Unsupported HTTP method: {method!r}")

try:
import httpx
except ImportError:
sys.stderr.write("ERROR: httpx not installed. Run: pip install httpx\n")
sys.exit(2)
except ImportError as exc:
raise AdminError("httpx not installed. Run: pip install httpx") from exc

url = f"{get_endpoint()}{path}"
_warn_insecure_endpoint(url)
headers = {"Authorization": f"Bearer {get_admin_token()}"}

try:
if method == "GET":
resp = httpx.get(url, headers=headers, timeout=15.0)
elif method == "POST":
else: # POST
resp = httpx.post(url, headers=headers, json=body or {}, timeout=15.0)
else:
raise ValueError(f"Unsupported method: {method}")
except Exception as exc: # noqa: BLE001
sys.stderr.write(f"ERROR: Could not reach worker at {url}: {exc}\n")
sys.exit(2)
except httpx.HTTPError as exc:
raise AdminError(f"Could not reach worker at {url}: {exc}") from exc

if resp.status_code == 401:
sys.stderr.write(
"ERROR: 401 Unauthorized. Check that TETHER_ADMIN_TOKEN matches\n"
" the value set via `wrangler secret put ADMIN_TOKEN`.\n"
raise AdminError(
"401 Unauthorized. Check that TETHER_ADMIN_TOKEN matches\n"
" the value set via `wrangler secret put ADMIN_TOKEN`."
)
sys.exit(2)

try:
body_json = resp.json()
except Exception: # noqa: BLE001
except Exception: # noqa: BLE001 — worker returned non-JSON
body_json = {"raw": resp.text}

if resp.status_code >= 400:
sys.stderr.write(f"ERROR: Worker returned HTTP {resp.status_code}: {body_json}\n")
sys.exit(2)
raise AdminError(f"Worker returned HTTP {resp.status_code}: {body_json}")

return body_json


__all__ = ["DEFAULT_LICENSE_ENDPOINT", "admin_request", "get_admin_token", "get_endpoint"]
__all__ = [
"DEFAULT_LICENSE_ENDPOINT",
"AdminError",
"admin_request",
"get_admin_token",
"get_endpoint",
]
15 changes: 11 additions & 4 deletions src/tether/admin/issue_license.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import argparse
import sys

from tether.admin._client import admin_request
from tether.admin._client import AdminError, admin_request

VALID_TIERS = ["trial", "pro", "team", "enterprise", "educational", "research", "oss"]

Expand All @@ -35,6 +35,9 @@ def main(argv: list[str] | None = None) -> int:
if args.expires_in < 1:
sys.stderr.write("ERROR: --expires-in must be >= 1\n")
return 2
if args.max_seats < 1:
sys.stderr.write("ERROR: --max-seats must be >= 1\n")
return 2

body = {
"customer_id": args.customer_id,
Expand All @@ -44,13 +47,17 @@ def main(argv: list[str] | None = None) -> int:
"notes": args.notes,
}

resp = admin_request("POST", "/admin/issue", body)
try:
resp = admin_request("POST", "/admin/issue", body)
except AdminError as exc:
sys.stderr.write(f"ERROR: {exc}\n")
return exc.exit_code

license_id = resp.get("license_id", "?")
code = resp.get("activation_code", "?")
code_expires = resp.get("activation_expires_at", "?")
license = resp.get("license", {})
expires = license.get("expires_at", "?")
lic = resp.get("license", {})
expires = lic.get("expires_at", "?")

print()
print(f" License issued for {args.customer_id}")
Expand Down
12 changes: 10 additions & 2 deletions src/tether/admin/list_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import sys

from tether.admin._client import admin_request
from tether.admin._client import AdminError, admin_request


def main(argv: list[str] | None = None) -> int:
Expand All @@ -19,7 +19,15 @@ def main(argv: list[str] | None = None) -> int:
parser.add_argument("--json", action="store_true", help="Output raw JSON (for scripts).")
args = parser.parse_args(argv)

resp = admin_request("GET", f"/admin/list?limit={args.limit}")
if args.limit < 1:
sys.stderr.write("ERROR: --limit must be >= 1\n")
return 2

try:
resp = admin_request("GET", f"/admin/list?limit={args.limit}")
except AdminError as exc:
sys.stderr.write(f"ERROR: {exc}\n")
return exc.exit_code
licenses = resp.get("licenses", [])

if args.json:
Expand Down
14 changes: 9 additions & 5 deletions src/tether/admin/revoke_license.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import argparse
import sys

from tether.admin._client import admin_request
from tether.admin._client import AdminError, admin_request


def main(argv: list[str] | None = None) -> int:
Expand All @@ -21,10 +21,14 @@ def main(argv: list[str] | None = None) -> int:
parser.add_argument("--reason", default="admin_revoke", help="Reason for revocation (admin audit).")
args = parser.parse_args(argv)

resp = admin_request("POST", "/admin/revoke", {
"license_id": args.license_id,
"reason": args.reason,
})
try:
resp = admin_request("POST", "/admin/revoke", {
"license_id": args.license_id,
"reason": args.reason,
})
except AdminError as exc:
sys.stderr.write(f"ERROR: {exc}\n")
return exc.exit_code

print()
print(f" License revoked: {resp.get('license_id')}")
Expand Down
138 changes: 138 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Tests for the admin CLI (license issuance / listing / revocation).

These exist because the prior `sys.exit`-from-library design made admin/
untestable. The refactor raises AdminError instead, so we can assert on
behavior here without spawning subprocesses.
"""
from __future__ import annotations

import httpx
import pytest

from tether.admin import issue_license, list_licenses, revoke_license
from tether.admin._client import AdminError, admin_request, get_admin_token


class _FakeResp:
def __init__(self, status_code: int, payload=None, text: str = ""):
self.status_code = status_code
self._payload = payload
self.text = text

def json(self):
if self._payload is None:
raise ValueError("no json")
return self._payload


# ---- _client.get_admin_token --------------------------------------------

def test_get_admin_token_missing_raises(monkeypatch):
monkeypatch.delenv("TETHER_ADMIN_TOKEN", raising=False)
with pytest.raises(AdminError) as ei:
get_admin_token()
assert ei.value.exit_code == 2
assert "TETHER_ADMIN_TOKEN" in str(ei.value)


def test_get_admin_token_present(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", " secret ")
assert get_admin_token() == "secret"


# ---- _client.admin_request ----------------------------------------------

def test_unsupported_method_is_not_a_network_error(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
with pytest.raises(AdminError) as ei:
admin_request("DELETE", "/admin/whatever")
# The bug was this being reported as "could not reach worker".
assert "Unsupported HTTP method" in str(ei.value)
assert "reach worker" not in str(ei.value)


def test_success_returns_json(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {"licenses": []}))
assert admin_request("GET", "/admin/list") == {"licenses": []}


def test_401_raises_auth_error(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
monkeypatch.setattr(httpx, "post", lambda *a, **k: _FakeResp(401))
with pytest.raises(AdminError) as ei:
admin_request("POST", "/admin/issue", {})
assert "401" in str(ei.value)


def test_4xx_raises(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
monkeypatch.setattr(httpx, "post", lambda *a, **k: _FakeResp(500, {"error": "boom"}))
with pytest.raises(AdminError):
admin_request("POST", "/admin/issue", {})


def test_network_error_wrapped(monkeypatch):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")

def _boom(*a, **k):
raise httpx.ConnectError("refused")

monkeypatch.setattr(httpx, "get", _boom)
with pytest.raises(AdminError) as ei:
admin_request("GET", "/admin/list")
assert "reach worker" in str(ei.value)


def test_insecure_endpoint_warns(monkeypatch, capsys):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
monkeypatch.setenv("TETHER_LICENSE_ENDPOINT", "http://license.example.com")
monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {}))
admin_request("GET", "/admin/list")
assert "non-HTTPS" in capsys.readouterr().err


def test_localhost_http_does_not_warn(monkeypatch, capsys):
monkeypatch.setenv("TETHER_ADMIN_TOKEN", "t")
monkeypatch.setenv("TETHER_LICENSE_ENDPOINT", "http://127.0.0.1:8787")
monkeypatch.setattr(httpx, "get", lambda *a, **k: _FakeResp(200, {}))
admin_request("GET", "/admin/list")
assert "non-HTTPS" not in capsys.readouterr().err


# ---- command main() exit codes ------------------------------------------

def test_issue_license_admin_error_returns_code(monkeypatch):
def _raise(*a, **k):
raise AdminError("nope", exit_code=2)

monkeypatch.setattr(issue_license, "admin_request", _raise)
rc = issue_license.main(["--customer-id", "a@b.com"])
assert rc == 2


def test_issue_license_success(monkeypatch, capsys):
monkeypatch.setattr(
issue_license, "admin_request",
lambda *a, **k: {"license_id": "lic_1", "activation_code": "REFLEX-AAAA",
"license": {"expires_at": "2026-12-01"}},
)
rc = issue_license.main(["--customer-id", "a@b.com", "--tier", "pro"])
assert rc == 0
assert "lic_1" in capsys.readouterr().out


def test_issue_license_bad_max_seats(monkeypatch):
assert issue_license.main(["--customer-id", "a@b.com", "--max-seats", "0"]) == 2


def test_list_licenses_bad_limit():
assert list_licenses.main(["--limit", "0"]) == 2


def test_revoke_license_admin_error_returns_code(monkeypatch):
def _raise(*a, **k):
raise AdminError("gone", exit_code=2)

monkeypatch.setattr(revoke_license, "admin_request", _raise)
assert revoke_license.main(["--license-id", "lic_x"]) == 2
Loading