diff --git a/src/core/containers/runtime/providers.py b/src/core/containers/runtime/providers.py index a8022ddc..de9685db 100644 --- a/src/core/containers/runtime/providers.py +++ b/src/core/containers/runtime/providers.py @@ -14,7 +14,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Sequence class ContainerProvider(ABC): @@ -119,9 +119,7 @@ def __init__(self): timeout=5, ) except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): - raise RuntimeError( - "Docker is not available. Please install Docker Desktop or Docker Engine." - ) + raise RuntimeError("Docker is not available. Please install Docker Desktop or Docker Engine.") def start_container( self, @@ -154,10 +152,13 @@ def start_container( # Build docker run command cmd = [ - "docker", "run", + "docker", + "run", "-d", # Detached - "--name", self._container_name, - "-p", f"{port}:8000", # Map port + "--name", + self._container_name, + "-p", + f"{port}:8000", # Map port ] # Add environment variables @@ -241,9 +242,7 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: time.sleep(0.5) - raise TimeoutError( - f"Container at {base_url} did not become ready within {timeout_s}s" - ) + raise TimeoutError(f"Container at {base_url} did not become ready within {timeout_s}s") def _find_available_port(self) -> int: """ @@ -277,6 +276,296 @@ def _generate_container_name(self, image: str) -> str: return f"{clean_image}-{timestamp}" +class DockerSwarmProvider(ContainerProvider): + """ + Container provider that uses Docker Swarm services for local concurrency. + + This provider creates a replicated Swarm service backed by the local Docker + engine. The built-in load-balancer fans requests across the replicas, + allowing multiple container instances to run concurrently on the developer + workstation (mirroring the workflow described in the Docker stack docs). + """ + + def __init__( + self, + *, + auto_init_swarm: bool = True, + overlay_network: Optional[str] = None, + ): + """ + Args: + auto_init_swarm: Whether to call ``docker swarm init`` when Swarm + is not active. Otherwise, user must manually initialize Swarm. + overlay_network: Optional overlay network name for the service. + When provided, the network is created with + ``docker network create --driver overlay --attachable`` if it + does not already exist. + """ + self._service_name: Optional[str] = None + self._service_id: Optional[str] = None + self._published_port: Optional[int] = None + self._overlay_network = overlay_network + self._auto_init_swarm = auto_init_swarm + + self._ensure_docker_available() + self._ensure_swarm_initialized() + if self._overlay_network: + self._ensure_overlay_network(self._overlay_network) + + def start_container( + self, + image: str, + port: Optional[int] = None, + env_vars: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> str: + """ + Start (or scale) a Swarm service for the given image. + + Supported kwargs: + replicas (int): Number of container replicas (default: 2). + cpu_limit (float | str): CPU limit passed to ``--limit-cpu``. + memory_limit (str): Memory limit passed to ``--limit-memory``. + constraints (Sequence[str]): Placement constraints. + labels (Dict[str, str]): Service labels. + command (Sequence[str] | str): Override container command. + """ + import shlex + import subprocess + import time + + allowed_kwargs = { + "replicas", + "cpu_limit", + "memory_limit", + "constraints", + "labels", + "command", + } + unknown = set(kwargs) - allowed_kwargs + if unknown: + raise ValueError(f"Unsupported kwargs for DockerSwarmProvider: {unknown}") + + replicas = int(kwargs.get("replicas", 2)) + cpu_limit = kwargs.get("cpu_limit") + memory_limit = kwargs.get("memory_limit") + constraints: Optional[Sequence[str]] = kwargs.get("constraints") + labels: Optional[Dict[str, str]] = kwargs.get("labels") + command_override = kwargs.get("command") + + if port is None: + port = self._find_available_port() + + self._service_name = self._generate_service_name(image) + self._published_port = port + + cmd = [ + "docker", + "service", + "create", + "--detach", + "--name", + self._service_name, + "--replicas", + str(max(1, replicas)), + "--publish", + f"{port}:8000", + ] + + if self._overlay_network: + cmd.extend(["--network", self._overlay_network]) + + if env_vars: + for key, value in env_vars.items(): + cmd.extend(["--env", f"{key}={value}"]) + + if cpu_limit is not None: + cmd.extend(["--limit-cpu", str(cpu_limit)]) + + if memory_limit is not None: + cmd.extend(["--limit-memory", str(memory_limit)]) + + if constraints: + for constraint in constraints: + cmd.extend(["--constraint", constraint]) + + if labels: + for key, value in labels.items(): + cmd.extend(["--label", f"{key}={value}"]) + + cmd.append(image) + + if command_override: + if isinstance(command_override, str): + cmd.extend(shlex.split(command_override)) + else: + cmd.extend(command_override) + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + ) + self._service_id = result.stdout.strip() + except subprocess.CalledProcessError as e: + error_msg = ( + "Failed to start Docker Swarm service.\n" + f"Command: {' '.join(cmd)}\n" + f"Exit code: {e.returncode}\n" + f"Stdout: {e.stdout}\n" + f"Stderr: {e.stderr}" + ) + raise RuntimeError(error_msg) from e + + # Give Swarm a brief moment to schedule the tasks. + time.sleep(1.0) + + return f"http://localhost:{port}" + + def stop_container(self) -> None: + """ + Remove the Swarm service (and keep the Swarm manager running). + """ + if not self._service_name: + return + + import subprocess + + try: + subprocess.run( + ["docker", "service", "rm", self._service_name], + capture_output=True, + check=True, + timeout=10, + ) + except subprocess.CalledProcessError: + # Service may already be gone; ignore. + pass + finally: + self._service_name = None + self._service_id = None + self._published_port = None + + def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: + """ + Wait for *all* replicas to become healthy by polling /health. + """ + import time + import requests + + deadline = time.time() + timeout_s + health_url = f"{base_url}/health" + + while time.time() < deadline: + try: + response = requests.get(health_url, timeout=2.0) + if response.status_code == 200: + return + except requests.RequestException: + pass + + time.sleep(0.5) + + raise TimeoutError(f"Swarm service at {base_url} did not become ready within {timeout_s}s") + + def _ensure_docker_available(self) -> None: + import subprocess + + try: + subprocess.run( + ["docker", "version"], + check=True, + capture_output=True, + timeout=5, + ) + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ) as exc: + raise RuntimeError("Docker is not available. Please install Docker Desktop or Docker Engine.") from exc + + def _ensure_swarm_initialized(self) -> None: + import subprocess + + try: + result = subprocess.run( + ["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + state = result.stdout.strip().lower() + if state == "active": + return + except subprocess.CalledProcessError: + state = "unknown" + + if not self._auto_init_swarm: + raise RuntimeError( + f"Docker Swarm is not active (state={state}). Enable Swarm manually or pass auto_init_swarm=True." + ) + + try: + subprocess.run( + ["docker", "swarm", "init"], + check=True, + capture_output=True, + timeout=10, + ) + except subprocess.CalledProcessError as e: + raise RuntimeError("Failed to initialize Docker Swarm") from e + + def _ensure_overlay_network(self, network: str) -> None: + import subprocess + + inspect = subprocess.run( + ["docker", "network", "inspect", network], + capture_output=True, + text=True, + check=False, + ) + if inspect.returncode == 0: + return + + try: + subprocess.run( + [ + "docker", + "network", + "create", + "--driver", + "overlay", + "--attachable", + network, + ], + check=True, + capture_output=True, + timeout=10, + ) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to create overlay network '{network}'") from e + + def _find_available_port(self) -> int: + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + return port + + def _generate_service_name(self, image: str) -> str: + import time + + clean_image = image.split("/")[-1].split(":")[0] + timestamp = int(time.time() * 1000) + return f"{clean_image}-swarm-{timestamp}" + + class KubernetesProvider(ContainerProvider): """ Container provider for Kubernetes clusters. @@ -290,4 +579,5 @@ class KubernetesProvider(ContainerProvider): >>> # Pod running in k8s, accessible via service or port-forward >>> provider.stop_container() """ + pass