Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 300 additions & 10 deletions src/core/containers/runtime/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Sequence


class ContainerProvider(ABC):
Expand Down Expand Up @@ -119,9 +119,7 @@ def __init__(self):
timeout=5,
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
raise RuntimeError(
"Docker is not available. Please install Docker Desktop or Docker Engine."
)
raise RuntimeError("Docker is not available. Please install Docker Desktop or Docker Engine.")

def start_container(
self,
Expand Down Expand Up @@ -154,10 +152,13 @@ def start_container(

# Build docker run command
cmd = [
"docker", "run",
"docker",
"run",
"-d", # Detached
"--name", self._container_name,
"-p", f"{port}:8000", # Map port
"--name",
self._container_name,
"-p",
f"{port}:8000", # Map port
]

# Add environment variables
Expand Down Expand Up @@ -241,9 +242,7 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None:

time.sleep(0.5)

raise TimeoutError(
f"Container at {base_url} did not become ready within {timeout_s}s"
)
raise TimeoutError(f"Container at {base_url} did not become ready within {timeout_s}s")

def _find_available_port(self) -> int:
"""
Expand Down Expand Up @@ -277,6 +276,296 @@ def _generate_container_name(self, image: str) -> str:
return f"{clean_image}-{timestamp}"


class DockerSwarmProvider(ContainerProvider):
"""
Container provider that uses Docker Swarm services for local concurrency.

This provider creates a replicated Swarm service backed by the local Docker
engine. The built-in load-balancer fans requests across the replicas,
allowing multiple container instances to run concurrently on the developer
workstation (mirroring the workflow described in the Docker stack docs).
"""

def __init__(
self,
*,
auto_init_swarm: bool = True,
overlay_network: Optional[str] = None,
):
"""
Args:
auto_init_swarm: Whether to call ``docker swarm init`` when Swarm
is not active. Otherwise, user must manually initialize Swarm.
overlay_network: Optional overlay network name for the service.
When provided, the network is created with
``docker network create --driver overlay --attachable`` if it
does not already exist.
"""
self._service_name: Optional[str] = None
self._service_id: Optional[str] = None
self._published_port: Optional[int] = None
self._overlay_network = overlay_network
self._auto_init_swarm = auto_init_swarm

self._ensure_docker_available()
self._ensure_swarm_initialized()
if self._overlay_network:
self._ensure_overlay_network(self._overlay_network)

def start_container(
self,
image: str,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> str:
"""
Start (or scale) a Swarm service for the given image.

Supported kwargs:
replicas (int): Number of container replicas (default: 2).
cpu_limit (float | str): CPU limit passed to ``--limit-cpu``.
memory_limit (str): Memory limit passed to ``--limit-memory``.
constraints (Sequence[str]): Placement constraints.
labels (Dict[str, str]): Service labels.
command (Sequence[str] | str): Override container command.
"""
import shlex
import subprocess
import time

allowed_kwargs = {
"replicas",
"cpu_limit",
"memory_limit",
"constraints",
"labels",
"command",
}
unknown = set(kwargs) - allowed_kwargs
if unknown:
raise ValueError(f"Unsupported kwargs for DockerSwarmProvider: {unknown}")

replicas = int(kwargs.get("replicas", 2))
cpu_limit = kwargs.get("cpu_limit")
memory_limit = kwargs.get("memory_limit")
constraints: Optional[Sequence[str]] = kwargs.get("constraints")
labels: Optional[Dict[str, str]] = kwargs.get("labels")
command_override = kwargs.get("command")

if port is None:
port = self._find_available_port()

self._service_name = self._generate_service_name(image)
self._published_port = port

cmd = [
"docker",
"service",
"create",
"--detach",
"--name",
self._service_name,
"--replicas",
str(max(1, replicas)),
"--publish",
f"{port}:8000",
]

if self._overlay_network:
cmd.extend(["--network", self._overlay_network])

if env_vars:
for key, value in env_vars.items():
cmd.extend(["--env", f"{key}={value}"])

if cpu_limit is not None:
cmd.extend(["--limit-cpu", str(cpu_limit)])

if memory_limit is not None:
cmd.extend(["--limit-memory", str(memory_limit)])

if constraints:
for constraint in constraints:
cmd.extend(["--constraint", constraint])

if labels:
for key, value in labels.items():
cmd.extend(["--label", f"{key}={value}"])

cmd.append(image)

if command_override:
if isinstance(command_override, str):
cmd.extend(shlex.split(command_override))
else:
cmd.extend(command_override)

try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
self._service_id = result.stdout.strip()
except subprocess.CalledProcessError as e:
error_msg = (
"Failed to start Docker Swarm service.\n"
f"Command: {' '.join(cmd)}\n"
f"Exit code: {e.returncode}\n"
f"Stdout: {e.stdout}\n"
f"Stderr: {e.stderr}"
)
raise RuntimeError(error_msg) from e

# Give Swarm a brief moment to schedule the tasks.
time.sleep(1.0)

return f"http://localhost:{port}"

def stop_container(self) -> None:
"""
Remove the Swarm service (and keep the Swarm manager running).
"""
if not self._service_name:
return

import subprocess

try:
subprocess.run(
["docker", "service", "rm", self._service_name],
capture_output=True,
check=True,
timeout=10,
)
except subprocess.CalledProcessError:
# Service may already be gone; ignore.
pass
finally:
self._service_name = None
self._service_id = None
self._published_port = None

def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None:
"""
Wait for *all* replicas to become healthy by polling /health.
"""
import time
import requests

deadline = time.time() + timeout_s
health_url = f"{base_url}/health"

while time.time() < deadline:
try:
response = requests.get(health_url, timeout=2.0)
if response.status_code == 200:
return
except requests.RequestException:
pass

time.sleep(0.5)

raise TimeoutError(f"Swarm service at {base_url} did not become ready within {timeout_s}s")

def _ensure_docker_available(self) -> None:
import subprocess

try:
subprocess.run(
["docker", "version"],
check=True,
capture_output=True,
timeout=5,
)
except (
subprocess.CalledProcessError,
FileNotFoundError,
subprocess.TimeoutExpired,
) as exc:
raise RuntimeError("Docker is not available. Please install Docker Desktop or Docker Engine.") from exc

def _ensure_swarm_initialized(self) -> None:
import subprocess

try:
result = subprocess.run(
["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
state = result.stdout.strip().lower()
if state == "active":
return
except subprocess.CalledProcessError:
state = "unknown"

if not self._auto_init_swarm:
raise RuntimeError(
f"Docker Swarm is not active (state={state}). Enable Swarm manually or pass auto_init_swarm=True."
)

try:
subprocess.run(
["docker", "swarm", "init"],
check=True,
capture_output=True,
timeout=10,
)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to initialize Docker Swarm") from e

def _ensure_overlay_network(self, network: str) -> None:
import subprocess

inspect = subprocess.run(
["docker", "network", "inspect", network],
capture_output=True,
text=True,
check=False,
)
if inspect.returncode == 0:
return

try:
subprocess.run(
[
"docker",
"network",
"create",
"--driver",
"overlay",
"--attachable",
network,
],
check=True,
capture_output=True,
timeout=10,
)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to create overlay network '{network}'") from e

def _find_available_port(self) -> int:
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
return port

def _generate_service_name(self, image: str) -> str:
import time

clean_image = image.split("/")[-1].split(":")[0]
timestamp = int(time.time() * 1000)
return f"{clean_image}-swarm-{timestamp}"


class KubernetesProvider(ContainerProvider):
"""
Container provider for Kubernetes clusters.
Expand All @@ -290,4 +579,5 @@ class KubernetesProvider(ContainerProvider):
>>> # Pod running in k8s, accessible via service or port-forward
>>> provider.stop_container()
"""

pass