diff --git a/src/openenv/core/containers/runtime/__init__.py b/src/openenv/core/containers/runtime/__init__.py index a72b5301..442bb035 100644 --- a/src/openenv/core/containers/runtime/__init__.py +++ b/src/openenv/core/containers/runtime/__init__.py @@ -6,10 +6,13 @@ """Container runtime providers.""" -from .providers import ContainerProvider, KubernetesProvider, LocalDockerProvider +from .providers import ContainerProvider, KubernetesProvider, LocalDockerProvider, RuntimeProvider +from .uv_provider import UVProvider __all__ = [ "ContainerProvider", "LocalDockerProvider", "KubernetesProvider", + "RuntimeProvider", + "UVProvider", ] \ No newline at end of file diff --git a/src/openenv/core/containers/runtime/providers.py b/src/openenv/core/containers/runtime/providers.py index a8022ddc..c04086c9 100644 --- a/src/openenv/core/containers/runtime/providers.py +++ b/src/openenv/core/containers/runtime/providers.py @@ -118,10 +118,12 @@ def __init__(self): capture_output=True, timeout=5, ) - except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): - raise RuntimeError( - "Docker is not available. Please install Docker Desktop or Docker Engine." - ) + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): + raise RuntimeError("Docker is not available. Please install Docker Desktop or Docker Engine.") def start_container( self, @@ -154,10 +156,13 @@ def start_container( # Build docker run command cmd = [ - "docker", "run", + "docker", + "run", "-d", # Detached - "--name", self._container_name, - "-p", f"{port}:8000", # Map port + "--name", + self._container_name, + "-p", + f"{port}:8000", # Map port ] # Add environment variables @@ -241,9 +246,7 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: time.sleep(0.5) - raise TimeoutError( - f"Container at {base_url} did not become ready within {timeout_s}s" - ) + raise TimeoutError(f"Container at {base_url} did not become ready within {timeout_s}s") def _find_available_port(self) -> int: """ @@ -290,4 +293,67 @@ class KubernetesProvider(ContainerProvider): >>> # Pod running in k8s, accessible via service or port-forward >>> provider.stop_container() """ + pass + + +class RuntimeProvider(ABC): + """ + Abstract base class for runtime providers that are not container providers. + Providers implement this interface to support different runtime platforms: + - UVProvider: Runs environments via `uv run` + + The provider manages a single runtime lifecycle and provides the base URL + for connecting to it. + + Example: + >>> provider = UVProvider(project_path="/path/to/env") + >>> base_url = provider.start() + >>> print(base_url) # http://localhost:8000 + >>> provider.stop() + """ + + @abstractmethod + def start( + self, + port: Optional[int] = None, + env_vars: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> str: + """ + Start a runtime from the specified image. + + Args: + image: Runtime image name + port: Port to expose (if None, provider chooses) + env_vars: Environment variables for the runtime + **kwargs: Additional runtime options + """ + + @abstractmethod + def stop(self) -> None: + """ + Stop the runtime. + """ + pass + + @abstractmethod + def wait_for_ready(self, timeout_s: float = 30.0) -> None: + """ + Wait for the runtime to be ready to accept requests. + """ + pass + + def __enter__(self) -> "RuntimeProvider": + """ + Enter the runtime provider. + """ + self.start() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + """ + Exit the runtime provider. + """ + self.stop() + return False diff --git a/src/openenv/core/containers/runtime/uv_provider.py b/src/openenv/core/containers/runtime/uv_provider.py new file mode 100644 index 00000000..3ddc89b9 --- /dev/null +++ b/src/openenv/core/containers/runtime/uv_provider.py @@ -0,0 +1,224 @@ +"""Providers for launching ASGI applications via ``uv run``.""" + +from __future__ import annotations + +import os +import socket +import subprocess +import time +from typing import Dict, Optional + +import requests + +from .providers import RuntimeProvider + + +def _check_uv_installed() -> None: + try: + subprocess.check_output(["uv", "--version"]) + except FileNotFoundError as exc: + raise RuntimeError( + "`uv` executable not found. Install uv from https://docs.astral.sh and ensure it is on PATH." + ) from exc + + +def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("", 0)) + sock.listen(1) + return sock.getsockname()[1] + + +def _create_uv_command( + *, + host: str, + port: int, + reload: bool, + workers: int, + app: str, + project_path: str, +) -> list[str]: + command: list[str] = ["uv", "run", "--isolated", "--project", project_path] + + command.append("--") + command.extend( + [ + "uvicorn", + app, + "--host", + host, + "--port", + str(port), + "--workers", + str(workers), + ] + ) + + if reload: + command.append("--reload") + + return command + + +def _poll_health(health_url: str, timeout_s: float) -> None: + """Poll a health endpoint until it returns HTTP 200 or times out.""" + + deadline = time.time() + timeout_s + while time.time() < deadline: + try: + timeout = max(0.0001, min(deadline - time.time(), 2.0)) + response = requests.get(health_url, timeout=timeout) + if response.status_code == 200: + return + except requests.RequestException: + continue + + time.sleep(0.5) + + raise TimeoutError(f"Server did not become ready within {timeout_s:.1f} seconds") + + +class UVProvider(RuntimeProvider): + """ + RuntimeProvider implementation backed by ``uv run``. + + Args: + project_path: Local path to a uv project (passed to ``uv run --project``) + app: ASGI application path for uvicorn (defaults to ``server.app:app``) + host: Host interface to bind to (defaults to ``0.0.0.0``) + reload: Whether to enable uvicorn's reload mode + env_vars: Environment variables to pass through to the spawned process + context_timeout_s: How long to wait for the environment to become ready + + Example: + >>> provider = UVProvider(project_path="/path/to/env") + >>> base_url = provider.start() + >>> print(base_url) # http://localhost:8000 + >>> # Use the environment via base_url + >>> provider.stop() + """ + + def __init__( + self, + *, + project_path: str, + app: str = "server.app:app", + host: str = "0.0.0.0", + reload: bool = False, + env_vars: Optional[Dict[str, str]] = None, + context_timeout_s: float = 60.0, + ): + """Initialize the UVProvider.""" + self.project_path = os.path.abspath(project_path) + self.app = app + self.host = host + self.reload = reload + self.env_vars = env_vars + self.context_timeout_s = context_timeout_s + _check_uv_installed() + self._process = None + self._base_url = None + + def start( + self, + port: Optional[int] = None, + env_vars: Optional[Dict[str, str]] = None, + workers: int = 1, + **_: Dict[str, str], + ) -> str: + """ + Start the environment via `uv run`. + + Args: + port: The port to bind the environment to + env_vars: Environment variables to pass to the environment + workers: The number of workers to use + + Returns: + The base URL of the environment + + Raises: + RuntimeError: If the environment is already running + """ + if self._process is not None and self._process.poll() is None: + raise RuntimeError("UVProvider is already running") + + bind_port = port or _find_free_port() + + command = _create_uv_command( + host=self.host, + port=bind_port, + reload=self.reload, + workers=workers, + app=self.app, + project_path=self.project_path, + ) + + env = os.environ.copy() + + if self.env_vars: + env.update(self.env_vars) + if env_vars: + env.update(env_vars) + + try: + self._process = subprocess.Popen(command, env=env) + except OSError as exc: + raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc + + client_host = "127.0.0.1" if self.host in {"0.0.0.0", "::"} else self.host + self._base_url = f"http://{client_host}:{bind_port}" + return self._base_url + + def wait_for_ready(self, timeout_s: float = 60.0) -> None: + """ + Wait for the environment to become ready. + + Args: + timeout_s: The timeout to wait for the environment to become ready + + Raises: + RuntimeError: If the environment is not running + TimeoutError: If the environment does not become ready within the timeout + """ + if self._process and self._process.poll() is not None: + code = self._process.returncode + raise RuntimeError(f"uv process exited prematurely with code {code}") + + _poll_health(f"{self._base_url}/health", timeout_s=timeout_s) + + def stop(self) -> None: + """ + Stop the environment. + + Raises: + RuntimeError: If the environment is not running + """ + if self._process is None: + return + + if self._process.poll() is None: + self._process.terminate() + try: + self._process.wait(timeout=10.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait(timeout=5.0) + + self._process = None + self._base_url = None + + @property + def base_url(self) -> str: + """ + The base URL of the environment. + + Returns: + The base URL of the environment + + Raises: + RuntimeError: If the environment is not running + """ + if self._base_url is None: + raise RuntimeError("UVProvider has not been started") + return self._base_url diff --git a/src/openenv/core/http_env_client.py b/src/openenv/core/http_env_client.py index 007ef6a5..6529f012 100644 --- a/src/openenv/core/http_env_client.py +++ b/src/openenv/core/http_env_client.py @@ -1,236 +1,279 @@ -""" -core/runner_env.py -Minimal HTTP-based environment client. -- Talks to a single env worker exposing: POST /reset, POST /step - -Future hooks (commented below) for: -- episode_id, seed on reset -- request_id on step -- custom headers (auth/trace) -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar - -import requests - -from .client_types import StepResult -from .containers.runtime import LocalDockerProvider - -if TYPE_CHECKING: - from .containers.runtime import ContainerProvider - -ActT = TypeVar("ActT") -ObsT = TypeVar("ObsT") -EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient") - - -class HTTPEnvClient(ABC, Generic[ActT, ObsT]): - def __init__( - self, - base_url: str, - request_timeout_s: float = 15.0, - default_headers: Optional[Dict[str, str]] = None, - provider: Optional["ContainerProvider"] = None, - ): - self._base = base_url.rstrip("/") - self._timeout = float(request_timeout_s) - self._http = requests.Session() - self._headers = default_headers or {} - self._provider = provider - - @classmethod - def from_docker_image( - cls: Type[EnvClientT], - image: str, - provider: Optional["ContainerProvider"] = None, - **kwargs: Any, - ) -> EnvClientT: - """ - Create an environment client by spinning up a Docker container locally. - - This is a development utility that: - 1. Starts a Docker container from the specified image - 2. Waits for the server to be ready - 3. Creates and returns a client instance connected to the container - - Note: The container lifecycle management is left to the user or higher-level - orchestration. The container will keep running until manually stopped. - - Args: - image: Docker image name to run (e.g., "echo-env:latest") - provider: Container provider to use (defaults to LocalDockerProvider) - **kwargs: Additional arguments to pass to provider.start_container() - (e.g., env_vars, port) - - Returns: - An instance of the client class connected to the running container - - Example: - >>> from envs.coding_env.client import CodingEnv - >>> from envs.coding_env.models import CodeAction - >>> - >>> # Create environment from image - >>> env = CodingEnv.from_docker_image("coding-env:latest") - >>> - >>> # Create environment with custom env vars - >>> env = CodingEnv.from_docker_image( - ... "coding-env:latest", - ... env_vars={"MY_VAR": "value"} - ... ) - >>> - >>> # Use the environment - >>> result = env.reset() - >>> print(result.observation) - >>> - >>> step_result = env.step(CodeAction(code="print('hello')")) - >>> print(step_result.observation.stdout) - >>> - >>> # Cleanup (optional) - >>> env.close() - """ - - # Use default provider if none provided - if provider is None: - provider = LocalDockerProvider() - - # 1. Start container with optional kwargs (e.g., env_vars, port) - base_url = provider.start_container(image, **kwargs) - - # 2. Wait for server to be ready - provider.wait_for_ready(base_url) - - # 3. Create and return client instance with provider reference - return cls(base_url=base_url, provider=provider) - - @classmethod - def from_hub( - cls: Type[EnvClientT], - repo_id: str, - provider: Optional["ContainerProvider"] = None, - **kwargs: Any, - ) -> EnvClientT: - """ - Create an environment client by pulling from a Hugging Face model hub. - """ - - if provider is None: - provider = LocalDockerProvider() - - if "tag" in kwargs: - tag = kwargs["tag"] - else: - tag = "latest" - - base_url = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}" - - return cls.from_docker_image(image=base_url, provider=provider) - - @abstractmethod - def _step_payload(self, action: ActT) -> dict: - """Convert an Action object to the JSON body expected by the env server.""" - raise NotImplementedError - - @abstractmethod - def _parse_result(self, payload: dict) -> StepResult[ObsT]: - """Convert a JSON response from the env server to StepResult[ObsT].""" - raise NotImplementedError - - @abstractmethod - def _parse_state(self, payload: dict) -> Any: - """Convert a JSON response from the state endpoint to a State object.""" - raise NotImplementedError - - # ---------- Environment Server Interface Methods ---------- - def reset(self, **kwargs: Any) -> StepResult[ObsT]: - """ - Reset the environment with optional parameters. - - Args: - **kwargs: Optional parameters passed to the environment's reset method. - Common parameters include: - - seed: Random seed for reproducibility - - episode_id: Custom episode identifier - - Any environment-specific reset parameters - - Returns: - StepResult containing initial observation - - Example: - >>> env.reset(seed=42, episode_id="ep-001") - """ - body: Dict[str, Any] = kwargs.copy() - r = self._http.post( - f"{self._base}/reset", - json=body, - headers=self._headers, - timeout=self._timeout, - ) - r.raise_for_status() - return self._parse_result(r.json()) - - def step(self, action: ActT, **kwargs: Any) -> StepResult[ObsT]: - """ - Execute an action in the environment with optional parameters. - - Args: - action: The action to execute - **kwargs: Optional parameters passed to the environment's step method. - Common parameters include: - - timeout_s: Execution timeout in seconds - - request_id: Request identifier for tracking - - render: Whether to render the environment - - Any environment-specific step parameters - - Returns: - StepResult containing observation, reward, and done status - - Example: - >>> env.step(action, timeout_s=30.0, request_id="req-123", render=True) - """ - body: Dict[str, Any] = { - "action": self._step_payload(action), - **kwargs # Forward all additional parameters - } - r = self._http.post( - f"{self._base}/step", - json=body, - headers=self._headers, - timeout=self._timeout, - ) - r.raise_for_status() - return self._parse_result(r.json()) - - def state(self) -> Any: - """ - Get the current environment state from the server. - - Returns: - State object with environment state information (e.g., episode_id, step_count) - - Example: - >>> client = EchoEnv.from_docker_image("echo-env:latest") - >>> result = client.reset() - >>> state = client.state() - >>> print(state.episode_id) - >>> print(state.step_count) - """ - r = self._http.get( - f"{self._base}/state", - headers=self._headers, - timeout=self._timeout, - ) - r.raise_for_status() - return self._parse_state(r.json()) - - def close(self) -> None: - """ - Close the environment and clean up resources. - - If this client was created via from_docker_image(), this will stop - and remove the associated container. - """ - if self._provider is not None: - self._provider.stop_container() +""" +core/runner_env.py +Minimal HTTP-based environment client. +- Talks to a single env worker exposing: POST /reset, POST /step + +Future hooks (commented below) for: +- episode_id, seed on reset +- request_id on step +- custom headers (auth/trace) +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar + +import requests + +from .client_types import StepResult +from .containers.runtime import LocalDockerProvider, UVProvider + +if TYPE_CHECKING: + from .containers.runtime import ContainerProvider, RuntimeProvider + +ActT = TypeVar("ActT") +ObsT = TypeVar("ObsT") +EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient") + + +class HTTPEnvClient(ABC, Generic[ActT, ObsT]): + def __init__( + self, + base_url: str, + request_timeout_s: float = 15.0, + default_headers: Optional[Dict[str, str]] = None, + provider: Optional["ContainerProvider"] = None, + ): + self._base = base_url.rstrip("/") + self._timeout = float(request_timeout_s) + self._http = requests.Session() + self._headers = default_headers or {} + self._provider = provider + + @property + def base_url(self) -> str: + """Base URL of the connected environment server.""" + return self._base + + @classmethod + def from_docker_image( + cls: Type[EnvClientT], + image: str, + provider: Optional["ContainerProvider"] = None, + **kwargs: Any, + ) -> EnvClientT: + """ + Create an environment client by spinning up a Docker container locally. + + This is a development utility that: + 1. Starts a Docker container from the specified image + 2. Waits for the server to be ready + 3. Creates and returns a client instance connected to the container + + Note: The container lifecycle management is left to the user or + higher-level orchestration. The container will keep running until + manually stopped. + + Args: + image: Docker image name to run (e.g., "echo-env:latest") + provider: Container provider to use (defaults to + LocalDockerProvider) + **kwargs: Additional arguments to pass to + provider.start_container() (e.g., env_vars, port) + + Returns: + An instance of the client class connected to the running container + + Example: + >>> from envs.coding_env.client import CodingEnv + >>> from envs.coding_env.models import CodeAction + >>> + >>> # Create environment from image + >>> env = CodingEnv.from_docker_image("coding-env:latest") + >>> + >>> # Create environment with custom env vars + >>> env = CodingEnv.from_docker_image( + ... "coding-env:latest", + ... env_vars={"MY_VAR": "value"} + ... ) + >>> + >>> # Use the environment + >>> result = env.reset() + >>> print(result.observation) + >>> + >>> step_result = env.step(CodeAction(code="print('hello')")) + >>> print(step_result.observation.stdout) + >>> + >>> # Cleanup (optional) + >>> env.close() + """ + + # Use default provider if none provided + if provider is None: + provider = LocalDockerProvider() + + # 1. Start container with optional kwargs (e.g., env_vars, port) + base_url = provider.start_container(image, **kwargs) + + # 2. Wait for server to be ready + provider.wait_for_ready(base_url) + + # 3. Create and return client instance with provider reference + return cls(base_url=base_url, provider=provider) + + @classmethod + def from_hub( + cls: Type[EnvClientT], + repo_id: str, + *, + use_docker: bool = True, + provider: Optional["ContainerProvider" | "RuntimeProvider"] = None, + **provider_kwargs: Any, + ) -> EnvClientT: + """Create a client from a Hugging Face Space. + + Args: + repo_id: Hugging Face space identifier ``{org}/{space}``. + use_docker: When ``True`` (default) pull from the HF registry and + launch via :class:`LocalDockerProvider`. When ``False`` run the + space locally with :class:`UVProvider`. + provider: Optional provider instance to reuse. Must be a + :class:`ContainerProvider` when ``use_docker=True`` and a + :class:`RuntimeProvider`` otherwise. + provider_kwargs: Additional keyword arguments forwarded to + either the container provider's ``start_container`` (docker) + or to the ``UVProvider`` constructor/start (uv). When + ``use_docker=False``, the ``project_path`` argument can be + used to override the default git URL + (``git+https://huggingface.co/spaces/{repo_id}``). + """ + + start_args = {} + for key in ("port", "env_vars", "workers"): + if key in provider_kwargs: + start_args[key] = provider_kwargs.pop(key) + + if use_docker: + docker_provider = provider or LocalDockerProvider() + tag = provider_kwargs.pop("tag", "latest") + image = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}" + base_url = docker_provider.start_container(image, **start_args, **provider_kwargs) + docker_provider.wait_for_ready(base_url) + return cls(base_url=base_url, provider=docker_provider) + else: + if provider is None: + uv_kwargs = dict(provider_kwargs) + project_path = uv_kwargs.pop("project_path", None) + if project_path is None: + project_path = f"git+https://huggingface.co/spaces/{repo_id}" + + provider = UVProvider(project_path=project_path, **uv_kwargs) + else: + if provider_kwargs: + raise ValueError("provider_kwargs cannot be used when supplying a provider instance") + + base_url = provider.start(**start_args) + provider.wait_for_ready() + + return cls(base_url=base_url, provider=provider) + + @abstractmethod + def _step_payload(self, action: ActT) -> dict: + """Convert an Action object to the JSON body expected by env server.""" + raise NotImplementedError + + @abstractmethod + def _parse_result(self, payload: dict) -> StepResult[ObsT]: + """Convert a JSON response from the env server to StepResult[ObsT].""" + raise NotImplementedError + + @abstractmethod + def _parse_state(self, payload: dict) -> Any: + """Convert a JSON response from state endpoint to a State object.""" + raise NotImplementedError + + # ---------- Environment Server Interface Methods ---------- + def reset(self, **kwargs: Any) -> StepResult[ObsT]: + """ + Reset the environment with optional parameters. + + Args: + **kwargs: Optional parameters passed to the environment's reset + method. Common parameters include: + - seed: Random seed for reproducibility + - episode_id: Custom episode identifier + - Any environment-specific reset parameters + + Returns: + StepResult containing initial observation + + Example: + >>> env.reset(seed=42, episode_id="ep-001") + """ + body: Dict[str, Any] = kwargs.copy() + r = self._http.post( + f"{self._base}/reset", + json=body, + headers=self._headers, + timeout=self._timeout, + ) + r.raise_for_status() + return self._parse_result(r.json()) + + def step(self, action: ActT, **kwargs: Any) -> StepResult[ObsT]: + """ + Execute an action in the environment with optional parameters. + + Args: + action: The action to execute + **kwargs: Optional parameters passed to the environment's step + method. Common parameters include: + - timeout_s: Execution timeout in seconds + - request_id: Request identifier for tracking + - render: Whether to render the environment + - Any environment-specific step parameters + + Returns: + StepResult containing observation, reward, and done status + + Example: + >>> env.step( + ... action, timeout_s=30.0, request_id="req-123", render=True + ... ) + """ + body: Dict[str, Any] = { + "action": self._step_payload(action), + **kwargs, # Forward all additional parameters + } + r = self._http.post( + f"{self._base}/step", + json=body, + headers=self._headers, + timeout=self._timeout, + ) + r.raise_for_status() + return self._parse_result(r.json()) + + def state(self) -> Any: + """ + Get the current environment state from the server. + + Returns: + State object with environment state information + (e.g., episode_id, step_count) + + Example: + >>> client = EchoEnv.from_docker_image("echo-env:latest") + >>> result = client.reset() + >>> state = client.state() + >>> print(state.episode_id) + >>> print(state.step_count) + """ + r = self._http.get( + f"{self._base}/state", + headers=self._headers, + timeout=self._timeout, + ) + r.raise_for_status() + return self._parse_state(r.json()) + + def close(self) -> None: + """ + Close the environment and clean up resources. + + If this client was created via from_docker_image(), this will stop + and remove the associated container. + """ + if self._provider is not None: + self._provider.stop_container()