Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/tether/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
"""Integration framework — connect external tools to tether."""
from tether.integrations.connector import connect, disconnect
from tether.integrations.registry import (
Integration,
get_integration,
list_integrations,
)

__all__ = ["Integration", "get_integration", "list_integrations"]
__all__ = [
"Integration",
"connect",
"disconnect",
"get_integration",
"list_integrations",
]
79 changes: 74 additions & 5 deletions src/tether/integrations/connector.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
"""Connector — install, start, query, and stop integrations."""
from __future__ import annotations

import json
import logging
import os
import signal
import subprocess
import time
from pathlib import Path
from typing import Any

import requests

from tether.integrations.registry import Integration, get_integration
from tether.integrations.registry import get_integration, state_dir

logger = logging.getLogger(__name__)

# Same-process handle cache. NOT relied on for stop — `tether connect up`
# Popens a child then the CLI process exits, so by the time `tether connect
# down` runs in a NEW process this dict is empty. The pid file below is what
# makes stop work across invocations.
_RUNNING: dict[str, subprocess.Popen] = {}


def _pid_file(name: str) -> Path:
return state_dir() / f"{name}.pid"


def _write_pid(name: str, pid: int) -> None:
_pid_file(name).write_text(str(pid))


def _read_pid(name: str) -> int | None:
try:
return int(_pid_file(name).read_text().strip())
except (OSError, ValueError):
return None


def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True # exists but owned by another user
return True


def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]:
integration = get_integration(name)
if integration is None:
Expand All @@ -35,6 +67,9 @@ def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]:

proc = integration.start(extra_args=extra_args)
_RUNNING[name] = proc
# Persist the pid so a later `tether connect down` (a separate CLI process)
# can actually stop this child instead of orphaning it.
_write_pid(name, proc.pid)

return {
"status": "started",
Expand All @@ -45,14 +80,48 @@ def connect(name: str, extra_args: list[str] | None = None) -> dict[str, Any]:
}


def _signal_for(integration) -> int:
sig_name = getattr(integration, "stop_signal", "SIGTERM") if integration else "SIGTERM"
return getattr(signal, sig_name, signal.SIGTERM)


def disconnect(name: str) -> dict[str, Any]:
integration = get_integration(name)

# 1) Same-process handle (rare — only if up + down ran in one process).
proc = _RUNNING.pop(name, None)
if proc is not None and proc.poll() is None:
proc.terminate()
proc.wait(timeout=10)
proc.send_signal(_signal_for(integration))
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
_pid_file(name).unlink(missing_ok=True)
return {"status": "stopped", "name": name, "pid": proc.pid}

integration = get_integration(name)
# 2) Cross-invocation: signal the pid we persisted at connect time.
pid = _read_pid(name)
if pid is not None and _pid_alive(pid):
sig = _signal_for(integration)
try:
os.kill(pid, sig)
# Wait for it to actually exit; escalate to SIGKILL if it won't.
for _ in range(10):
if not _pid_alive(pid):
break
time.sleep(1)
else:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
_pid_file(name).unlink(missing_ok=True)
return {"status": "stopped", "name": name, "pid": pid}

# 3) Stale pid file (process already gone) — clean it up.
_pid_file(name).unlink(missing_ok=True)

# 4) Something is still answering on the port but we didn't start it.
if integration and integration.health_check():
return {"status": "external_still_running", "name": name}

Expand Down
106 changes: 80 additions & 26 deletions src/tether/integrations/registry.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
"""Integration registry — known external tools tether can connect to."""
from __future__ import annotations

import importlib
import importlib.util
import logging
import shutil
import os
import subprocess
import sys
import time
from dataclasses import dataclass, field
from typing import Any
from pathlib import Path

import requests

logger = logging.getLogger(__name__)


def state_dir() -> Path:
"""Per-user dir for integration runtime state (pid + log files).

Honors TETHER_HOME so it agrees with the rest of the CLI's config home.
"""
home = Path(os.environ.get("TETHER_HOME", Path.home() / ".tether"))
d = home / "integrations"
d.mkdir(parents=True, exist_ok=True)
return d


@dataclass(frozen=True)
class Integration:
name: str
description: str
pip_package: str
pip_extras: str = ""
# Module name to import for the installed-check. Defaults to the pip
# package with '-' → '_'; set explicitly when they differ (the common
# case for any package whose import name isn't its pip name).
import_name: str = ""
# Optional version constraint applied at install time (e.g. ">=1.2,<2").
# Empty = unpinned (a future release of the integration can change
# behavior silently — set this for anything load-bearing).
pip_version_spec: str = ""
health_url: str = "http://localhost:8000/healthz"
start_command: list[str] = field(default_factory=list)
stop_signal: str = "SIGTERM"
Expand All @@ -31,24 +50,38 @@ class Integration:

@property
def pip_spec(self) -> str:
spec = self.pip_package
if self.pip_extras:
return f"{self.pip_package}[{self.pip_extras}]"
return self.pip_package
spec = f"{spec}[{self.pip_extras}]"
if self.pip_version_spec:
spec = f"{spec}{self.pip_version_spec}"
return spec

@property
def _import_name(self) -> str:
return self.import_name or self.pip_package.replace("-", "_")

@property
def log_file(self) -> Path:
return state_dir() / f"{self.name}.log"

def is_installed(self) -> bool:
# find_spec resolves the module WITHOUT importing it — so checking
# "is it installed?" doesn't run the target package's import-time side
# effects (rtsm[gpu] can init CUDA on import).
try:
importlib.import_module(self.pip_package.replace("-", "_").split("[")[0])
return True
except ImportError:
return importlib.util.find_spec(self._import_name) is not None
except (ImportError, ValueError):
return False

def install(self) -> None:
logger.info("Installing %s...", self.pip_spec)
cmd = [
sys.executable, "-m", "pip", "install", self.pip_spec,
"--extra-index-url", "https://download.pytorch.org/whl/cu128",
"-q",
]
logger.info("Installing %s into %s ...", self.pip_spec, sys.executable)
cmd = [sys.executable, "-m", "pip", "install", self.pip_spec]
# The PyTorch CUDA wheel index is only correct on Linux + CUDA. On
# macOS, Jetson (JetPack wheels), and CPU boxes it's useless or wrong,
# so don't force it there.
if sys.platform.startswith("linux") and "cu" in self.pip_extras.lower():
cmd += ["--extra-index-url", "https://download.pytorch.org/whl/cu128"]
subprocess.check_call(cmd)
logger.info("Installed %s", self.pip_spec)

Expand All @@ -63,23 +96,44 @@ def start(self, extra_args: list[str] | None = None) -> subprocess.Popen:
cmd = list(self.start_command)
if extra_args:
cmd.extend(extra_args)
logger.info("Starting %s: %s", self.name, " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
for _ in range(30):
if self.health_check(timeout=1.0):
logger.info("%s is healthy at %s", self.name, self.health_url)
return proc
time.sleep(1)
log_path = self.log_file
logger.info("Starting %s: %s (logs → %s)", self.name, " ".join(cmd), log_path)
# Redirect child output to a log file, NOT PIPE: an unread PIPE fills
# its 64KB buffer and the child blocks, looking like a hang. The log
# file also gives `connect` a real place to point users on failure.
log_fh = open(log_path, "wb")
proc = subprocess.Popen(cmd, stdout=log_fh, stderr=subprocess.STDOUT)
try:
for _ in range(30):
# Fail fast if the child died on launch (bad flag, missing
# model) instead of waiting the full 30s for a health check
# that can never pass.
if proc.poll() is not None:
tail = self._log_tail(log_path)
raise RuntimeError(
f"{self.name} exited immediately (code {proc.returncode}). "
f"Last log lines:\n{tail}\nFull log: {log_path}"
)
if self.health_check(timeout=1.0):
logger.info("%s is healthy at %s", self.name, self.health_url)
return proc
time.sleep(1)
finally:
log_fh.close()
proc.terminate()
tail = self._log_tail(log_path)
raise RuntimeError(
f"{self.name} failed to become healthy at {self.health_url} "
f"within 30s. Check logs with: {' '.join(cmd)}"
f"within 30s. Last log lines:\n{tail}\nFull log: {log_path}"
)

@staticmethod
def _log_tail(path: Path, n: int = 20) -> str:
try:
return "\n".join(path.read_text(errors="replace").splitlines()[-n:])
except OSError:
return "(log unavailable)"


RTSM = Integration(
name="rtsm",
Expand Down
95 changes: 95 additions & 0 deletions tests/test_integrations_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Integration connect/disconnect lifecycle.

Pins the fix for `tether connect down` being a structural no-op (the running
handle lived in a module dict that died with the CLI process) by persisting the
pid, plus the registry hardening (find_spec install-check, fail-fast start,
version-pinned spec).
"""
from __future__ import annotations

import subprocess
import sys

import pytest

from tether.integrations import connector
from tether.integrations.registry import Integration


@pytest.fixture(autouse=True)
def _isolated_home(tmp_path, monkeypatch):
monkeypatch.setenv("TETHER_HOME", str(tmp_path))
connector._RUNNING.clear()
yield


def test_disconnect_stops_pid_from_a_separate_process():
"""The pid persisted at connect time lets a fresh process actually stop it."""
proc = subprocess.Popen([sys.executable, "-c", "import time; time.sleep(30)"])
try:
connector._write_pid("rtsm", proc.pid)
connector._RUNNING.clear() # simulate: `connect down` is a new CLI process
assert connector._pid_alive(proc.pid)

result = connector.disconnect("rtsm")

assert result["status"] == "stopped"
assert result["pid"] == proc.pid
proc.wait(timeout=5)
assert not connector._pid_alive(proc.pid)
assert not connector._pid_file("rtsm").exists()
finally:
if proc.poll() is None:
proc.kill()
proc.wait(timeout=5)


def test_disconnect_cleans_stale_pid_file():
dead = subprocess.Popen([sys.executable, "-c", "pass"])
dead.wait()
connector._write_pid("rtsm", dead.pid)
connector._RUNNING.clear()

result = connector.disconnect("rtsm")

assert result["status"] in ("not_running", "external_still_running")
assert not connector._pid_file("rtsm").exists()


def test_is_installed_uses_find_spec_no_import():
present = Integration(name="a", description="", pip_package="os")
assert present.is_installed() is True
absent = Integration(name="b", description="", pip_package="totally-not-real-pkg-xyz")
assert absent.is_installed() is False


def test_is_installed_honors_explicit_import_name():
# pip name != import name: declare it.
i = Integration(name="c", description="", pip_package="pip-thing", import_name="json")
assert i.is_installed() is True


def test_start_fails_fast_on_immediate_exit():
integ = Integration(
name="boom",
description="",
pip_package="x",
start_command=[sys.executable, "-c", "import sys; sys.exit(3)"],
health_url="http://localhost:59999/healthz",
)
with pytest.raises(RuntimeError) as ei:
integ.start()
assert "exited immediately" in str(ei.value)
assert integ.log_file.exists()


def test_pip_spec_includes_extras_and_version():
i = Integration(
name="z", description="", pip_package="rtsm",
pip_extras="gpu", pip_version_spec=">=1.0,<2",
)
assert i.pip_spec == "rtsm[gpu]>=1.0,<2"


def test_connect_disconnect_exported():
from tether.integrations import connect, disconnect # noqa: F401
Loading