diff --git a/src/tether/integrations/__init__.py b/src/tether/integrations/__init__.py index 7c869c6..b6feecc 100644 --- a/src/tether/integrations/__init__.py +++ b/src/tether/integrations/__init__.py @@ -1,8 +1,15 @@ """Integration framework — connect external tools to tether.""" +from tether.integrations.connector import connect, disconnect from tether.integrations.registry import ( Integration, get_integration, list_integrations, ) -__all__ = ["Integration", "get_integration", "list_integrations"] +__all__ = [ + "Integration", + "connect", + "disconnect", + "get_integration", + "list_integrations", +] diff --git a/src/tether/integrations/connector.py b/src/tether/integrations/connector.py index 8c8b375..8d36d1d 100644 --- a/src/tether/integrations/connector.py +++ b/src/tether/integrations/connector.py @@ -1,20 +1,52 @@ """Connector — install, start, query, and stop integrations.""" from __future__ import annotations -import json import logging +import os +import signal import subprocess +import time +from pathlib import Path from typing import Any import requests -from tether.integrations.registry import Integration, get_integration +from tether.integrations.registry import get_integration, state_dir logger = logging.getLogger(__name__) +# Same-process handle cache. NOT relied on for stop — `tether connect up` +# Popens a child then the CLI process exits, so by the time `tether connect +# down` runs in a NEW process this dict is empty. The pid file below is what +# makes stop work across invocations. _RUNNING: dict[str, subprocess.Popen] = {} +def _pid_file(name: str) -> Path: + return state_dir() / f"{name}.pid" + + +def _write_pid(name: str, pid: int) -> None: + _pid_file(name).write_text(str(pid)) + + +def _read_pid(name: str) -> int | None: + try: + return int(_pid_file(name).read_text().strip()) + except (OSError, ValueError): + return None + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True # exists but owned by another user + return True + + def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]: integration = get_integration(name) if integration is None: @@ -35,6 +67,9 @@ def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]: proc = integration.start(extra_args=extra_args) _RUNNING[name] = proc + # Persist the pid so a later `tether connect down` (a separate CLI process) + # can actually stop this child instead of orphaning it. + _write_pid(name, proc.pid) return { "status": "started", @@ -45,14 +80,48 @@ def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]: } +def _signal_for(integration) -> int: + sig_name = getattr(integration, "stop_signal", "SIGTERM") if integration else "SIGTERM" + return getattr(signal, sig_name, signal.SIGTERM) + + def disconnect(name: str) -> dict[str, Any]: + integration = get_integration(name) + + # 1) Same-process handle (rare — only if up + down ran in one process). proc = _RUNNING.pop(name, None) if proc is not None and proc.poll() is None: - proc.terminate() - proc.wait(timeout=10) + proc.send_signal(_signal_for(integration)) + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + _pid_file(name).unlink(missing_ok=True) return {"status": "stopped", "name": name, "pid": proc.pid} - integration = get_integration(name) + # 2) Cross-invocation: signal the pid we persisted at connect time. + pid = _read_pid(name) + if pid is not None and _pid_alive(pid): + sig = _signal_for(integration) + try: + os.kill(pid, sig) + # Wait for it to actually exit; escalate to SIGKILL if it won't. + for _ in range(10): + if not _pid_alive(pid): + break + time.sleep(1) + else: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass + _pid_file(name).unlink(missing_ok=True) + return {"status": "stopped", "name": name, "pid": pid} + + # 3) Stale pid file (process already gone) — clean it up. + _pid_file(name).unlink(missing_ok=True) + + # 4) Something is still answering on the port but we didn't start it. if integration and integration.health_check(): return {"status": "external_still_running", "name": name} diff --git a/src/tether/integrations/registry.py b/src/tether/integrations/registry.py index 6d1c882..cdb8fb3 100644 --- a/src/tether/integrations/registry.py +++ b/src/tether/integrations/registry.py @@ -1,26 +1,45 @@ """Integration registry — known external tools tether can connect to.""" from __future__ import annotations -import importlib +import importlib.util import logging -import shutil +import os import subprocess import sys import time from dataclasses import dataclass, field -from typing import Any +from pathlib import Path import requests logger = logging.getLogger(__name__) +def state_dir() -> Path: + """Per-user dir for integration runtime state (pid + log files). + + Honors TETHER_HOME so it agrees with the rest of the CLI's config home. + """ + home = Path(os.environ.get("TETHER_HOME", Path.home() / ".tether")) + d = home / "integrations" + d.mkdir(parents=True, exist_ok=True) + return d + + @dataclass(frozen=True) class Integration: name: str description: str pip_package: str pip_extras: str = "" + # Module name to import for the installed-check. Defaults to the pip + # package with '-' → '_'; set explicitly when they differ (the common + # case for any package whose import name isn't its pip name). + import_name: str = "" + # Optional version constraint applied at install time (e.g. ">=1.2,<2"). + # Empty = unpinned (a future release of the integration can change + # behavior silently — set this for anything load-bearing). + pip_version_spec: str = "" health_url: str = "http://localhost:8000/healthz" start_command: list[str] = field(default_factory=list) stop_signal: str = "SIGTERM" @@ -31,24 +50,38 @@ class Integration: @property def pip_spec(self) -> str: + spec = self.pip_package if self.pip_extras: - return f"{self.pip_package}[{self.pip_extras}]" - return self.pip_package + spec = f"{spec}[{self.pip_extras}]" + if self.pip_version_spec: + spec = f"{spec}{self.pip_version_spec}" + return spec + + @property + def _import_name(self) -> str: + return self.import_name or self.pip_package.replace("-", "_") + + @property + def log_file(self) -> Path: + return state_dir() / f"{self.name}.log" def is_installed(self) -> bool: + # find_spec resolves the module WITHOUT importing it — so checking + # "is it installed?" doesn't run the target package's import-time side + # effects (rtsm[gpu] can init CUDA on import). try: - importlib.import_module(self.pip_package.replace("-", "_").split("[")[0]) - return True - except ImportError: + return importlib.util.find_spec(self._import_name) is not None + except (ImportError, ValueError): return False def install(self) -> None: - logger.info("Installing %s...", self.pip_spec) - cmd = [ - sys.executable, "-m", "pip", "install", self.pip_spec, - "--extra-index-url", "https://download.pytorch.org/whl/cu128", - "-q", - ] + logger.info("Installing %s into %s ...", self.pip_spec, sys.executable) + cmd = [sys.executable, "-m", "pip", "install", self.pip_spec] + # The PyTorch CUDA wheel index is only correct on Linux + CUDA. On + # macOS, Jetson (JetPack wheels), and CPU boxes it's useless or wrong, + # so don't force it there. + if sys.platform.startswith("linux") and "cu" in self.pip_extras.lower(): + cmd += ["--extra-index-url", "https://download.pytorch.org/whl/cu128"] subprocess.check_call(cmd) logger.info("Installed %s", self.pip_spec) @@ -63,23 +96,44 @@ def start(self, extra_args: list[str] | None = None) -> subprocess.Popen: cmd = list(self.start_command) if extra_args: cmd.extend(extra_args) - logger.info("Starting %s: %s", self.name, " ".join(cmd)) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - for _ in range(30): - if self.health_check(timeout=1.0): - logger.info("%s is healthy at %s", self.name, self.health_url) - return proc - time.sleep(1) + log_path = self.log_file + logger.info("Starting %s: %s (logs → %s)", self.name, " ".join(cmd), log_path) + # Redirect child output to a log file, NOT PIPE: an unread PIPE fills + # its 64KB buffer and the child blocks, looking like a hang. The log + # file also gives `connect` a real place to point users on failure. + log_fh = open(log_path, "wb") + proc = subprocess.Popen(cmd, stdout=log_fh, stderr=subprocess.STDOUT) + try: + for _ in range(30): + # Fail fast if the child died on launch (bad flag, missing + # model) instead of waiting the full 30s for a health check + # that can never pass. + if proc.poll() is not None: + tail = self._log_tail(log_path) + raise RuntimeError( + f"{self.name} exited immediately (code {proc.returncode}). " + f"Last log lines:\n{tail}\nFull log: {log_path}" + ) + if self.health_check(timeout=1.0): + logger.info("%s is healthy at %s", self.name, self.health_url) + return proc + time.sleep(1) + finally: + log_fh.close() proc.terminate() + tail = self._log_tail(log_path) raise RuntimeError( f"{self.name} failed to become healthy at {self.health_url} " - f"within 30s. Check logs with: {' '.join(cmd)}" + f"within 30s. Last log lines:\n{tail}\nFull log: {log_path}" ) + @staticmethod + def _log_tail(path: Path, n: int = 20) -> str: + try: + return "\n".join(path.read_text(errors="replace").splitlines()[-n:]) + except OSError: + return "(log unavailable)" + RTSM = Integration( name="rtsm", diff --git a/tests/test_integrations_lifecycle.py b/tests/test_integrations_lifecycle.py new file mode 100644 index 0000000..d28aea1 --- /dev/null +++ b/tests/test_integrations_lifecycle.py @@ -0,0 +1,95 @@ +"""Integration connect/disconnect lifecycle. + +Pins the fix for `tether connect down` being a structural no-op (the running +handle lived in a module dict that died with the CLI process) by persisting the +pid, plus the registry hardening (find_spec install-check, fail-fast start, +version-pinned spec). +""" +from __future__ import annotations + +import subprocess +import sys + +import pytest + +from tether.integrations import connector +from tether.integrations.registry import Integration + + +@pytest.fixture(autouse=True) +def _isolated_home(tmp_path, monkeypatch): + monkeypatch.setenv("TETHER_HOME", str(tmp_path)) + connector._RUNNING.clear() + yield + + +def test_disconnect_stops_pid_from_a_separate_process(): + """The pid persisted at connect time lets a fresh process actually stop it.""" + proc = subprocess.Popen([sys.executable, "-c", "import time; time.sleep(30)"]) + try: + connector._write_pid("rtsm", proc.pid) + connector._RUNNING.clear() # simulate: `connect down` is a new CLI process + assert connector._pid_alive(proc.pid) + + result = connector.disconnect("rtsm") + + assert result["status"] == "stopped" + assert result["pid"] == proc.pid + proc.wait(timeout=5) + assert not connector._pid_alive(proc.pid) + assert not connector._pid_file("rtsm").exists() + finally: + if proc.poll() is None: + proc.kill() + proc.wait(timeout=5) + + +def test_disconnect_cleans_stale_pid_file(): + dead = subprocess.Popen([sys.executable, "-c", "pass"]) + dead.wait() + connector._write_pid("rtsm", dead.pid) + connector._RUNNING.clear() + + result = connector.disconnect("rtsm") + + assert result["status"] in ("not_running", "external_still_running") + assert not connector._pid_file("rtsm").exists() + + +def test_is_installed_uses_find_spec_no_import(): + present = Integration(name="a", description="", pip_package="os") + assert present.is_installed() is True + absent = Integration(name="b", description="", pip_package="totally-not-real-pkg-xyz") + assert absent.is_installed() is False + + +def test_is_installed_honors_explicit_import_name(): + # pip name != import name: declare it. + i = Integration(name="c", description="", pip_package="pip-thing", import_name="json") + assert i.is_installed() is True + + +def test_start_fails_fast_on_immediate_exit(): + integ = Integration( + name="boom", + description="", + pip_package="x", + start_command=[sys.executable, "-c", "import sys; sys.exit(3)"], + health_url="http://localhost:59999/healthz", + ) + with pytest.raises(RuntimeError) as ei: + integ.start() + assert "exited immediately" in str(ei.value) + assert integ.log_file.exists() + + +def test_pip_spec_includes_extras_and_version(): + i = Integration( + name="z", description="", pip_package="rtsm", + pip_extras="gpu", pip_version_spec=">=1.0,<2", + ) + assert i.pip_spec == "rtsm[gpu]>=1.0,<2" + + +def test_connect_disconnect_exported(): + from tether.integrations import connect, disconnect # noqa: F401