Skip to content

Commit a475858

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Resolve comments and kubernetes sglang router intregration
Test gateway package update Test gateway package update Test gateway package update Test gateway package update Test gateway package update Resolve rate limits and location issue Resolve rate limits and location issue Resolve rate limits and location issue Resolve all major comments Resolve all major comments Resolve kubernetes gateway issue with sglang intregration
1 parent 5699556 commit a475858

File tree

11 files changed

+147
-152
lines changed

11 files changed

+147
-152
lines changed

gateway/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ requires-python = ">=3.10"
1111
dynamic = ["version"]
1212
dependencies = [
1313
# release builds of dstack-gateway depend on a PyPI version of dstack instead
14-
"dstack[gateway] @ git+https://github.com/Bihan/dstack.git@add_sglang_router_support",
14+
"dstack[gateway] @ https://github.com/Bihan/dstack/archive/refs/heads/add_sglang_router_support.tar.gz",
1515
]
1616

1717
[project.optional-dependencies]

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,26 +1014,25 @@ def get_latest_runner_build() -> Optional[str]:
10141014
return None
10151015

10161016

1017-
def get_dstack_gateway_wheel(build: str) -> str:
1017+
def get_dstack_gateway_wheel(build: str, router: Optional[AnyRouterConfig] = None) -> str:
10181018
channel = "release" if settings.DSTACK_RELEASE else "stgn"
10191019
base_url = f"https://dstack-gateway-downloads.s3.amazonaws.com/{channel}"
10201020
if build == "latest":
10211021
r = requests.get(f"{base_url}/latest-version", timeout=5)
10221022
r.raise_for_status()
10231023
build = r.text.strip()
10241024
logger.debug("Found the latest gateway build: %s", build)
1025-
# return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl"
1026-
return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl"
1025+
# wheel = f"{base_url}/dstack_gateway-{build}-py3-none-any.whl"
1026+
wheel = "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl"
1027+
# Build package spec with extras if router is specified
1028+
if router:
1029+
return f"dstack-gateway[{router.type}] @ {wheel}"
1030+
return f"dstack-gateway @ {wheel}"
10271031

10281032

10291033
def get_dstack_gateway_commands(router: Optional[AnyRouterConfig] = None) -> List[str]:
10301034
build = get_dstack_runner_version()
1031-
wheel = get_dstack_gateway_wheel(build)
1032-
# Build package spec with extras if router is specified
1033-
if router:
1034-
gateway_package = f"dstack-gateway[{router.type}] @ {wheel}"
1035-
else:
1036-
gateway_package = f"dstack-gateway @ {wheel}"
1035+
gateway_package = get_dstack_gateway_wheel(build, router)
10371036
return [
10381037
"mkdir -p /home/ubuntu/dstack",
10391038
"python3 -m venv /home/ubuntu/dstack/blue",

src/dstack/_internal/core/backends/kubernetes/compute.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import shlex
12
import subprocess
23
import tempfile
34
import threading
@@ -51,6 +52,7 @@
5152
SSHConnectionParams,
5253
)
5354
from dstack._internal.core.models.resources import CPUSpec, Memory
55+
from dstack._internal.core.models.routers import AnyRouterConfig
5456
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
5557
from dstack._internal.core.models.volumes import Volume
5658
from dstack._internal.utils.common import parse_memory
@@ -403,7 +405,9 @@ def create_gateway(
403405
# Consider deploying an NLB. It seems it requires some extra configuration on the cluster:
404406
# https://docs.aws.amazon.com/eks/latest/userguide/network-load-balancing.html
405407
instance_name = generate_unique_gateway_instance_name(configuration)
406-
commands = _get_gateway_commands(authorized_keys=[configuration.ssh_key_pub])
408+
commands = _get_gateway_commands(
409+
authorized_keys=[configuration.ssh_key_pub], router=configuration.router
410+
)
407411
pod = client.V1Pod(
408412
metadata=client.V1ObjectMeta(
409413
name=instance_name,
@@ -940,9 +944,13 @@ def _add_authorized_key_to_jump_pod(
940944
)
941945

942946

943-
def _get_gateway_commands(authorized_keys: List[str]) -> List[str]:
947+
def _get_gateway_commands(
948+
authorized_keys: List[str], router: Optional[AnyRouterConfig] = None
949+
) -> List[str]:
944950
authorized_keys_content = "\n".join(authorized_keys).strip()
945-
gateway_commands = " && ".join(get_dstack_gateway_commands())
951+
gateway_commands = " && ".join(get_dstack_gateway_commands(router=router))
952+
quoted_gateway_commands = shlex.quote(gateway_commands)
953+
946954
commands = [
947955
# install packages
948956
"apt-get update && apt-get install -y sudo wget openssh-server nginx python3.10-venv libaugeas0",
@@ -971,7 +979,7 @@ def _get_gateway_commands(authorized_keys: List[str]) -> List[str]:
971979
# start sshd
972980
"/usr/sbin/sshd -p 22 -o PermitUserEnvironment=yes",
973981
# run gateway
974-
f"su ubuntu -c '{gateway_commands}'",
982+
f"su ubuntu -c {quoted_gateway_commands}",
975983
"sleep infinity",
976984
]
977985
return commands

src/dstack/_internal/proxy/gateway/model_routers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
def get_router(router: AnyRouterConfig, context: Optional[RouterContext] = None) -> Router:
1111
if router.type == "sglang":
12-
return SglangRouter(router=router, context=context)
12+
return SglangRouter(config=router, context=context)
1313
raise ProxyError(f"Router type '{router.type}' is not available")
1414

1515

src/dstack/_internal/proxy/gateway/model_routers/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ class Router(ABC):
2828

2929
def __init__(
3030
self,
31-
router: Optional[AnyRouterConfig] = None,
31+
config: Optional[AnyRouterConfig] = None,
3232
context: Optional[RouterContext] = None,
3333
):
3434
"""Initialize router with context.
3535
3636
Args:
37-
router: Optional router configuration (implementation-specific)
37+
config: Optional router configuration (implementation-specific)
3838
context: Runtime context for the router (host, port, logging, etc.)
3939
"""
4040
self.context = context or RouterContext()

src/dstack/_internal/proxy/gateway/model_routers/sglang.py

Lines changed: 71 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import json
21
import shutil
32
import subprocess
3+
import sys
44
import time
55
import urllib.parse
66
from typing import List, Optional
77

8+
import httpx
9+
810
from dstack._internal.core.models.routers import RouterType, SGLangRouterConfig
9-
from dstack._internal.proxy.gateway.const import DSTACK_DIR_ON_GATEWAY
1011
from dstack._internal.proxy.lib.errors import UnexpectedProxyError
1112
from dstack._internal.utils.logging import get_logger
1213

@@ -20,50 +21,43 @@ class SglangRouter(Router):
2021

2122
TYPE = RouterType.SGLANG
2223

23-
def __init__(self, router: SGLangRouterConfig, context: Optional[RouterContext] = None):
24+
def __init__(self, config: SGLangRouterConfig, context: Optional[RouterContext] = None):
2425
"""Initialize SGLang router.
2526
2627
Args:
27-
router: SGLang router configuration (policy, cache_threshold, etc.)
28+
config: SGLang router configuration (policy, cache_threshold, etc.)
2829
context: Runtime context for the router (host, port, logging, etc.)
2930
"""
30-
super().__init__(router=router, context=context)
31-
self.config = router
31+
super().__init__(config=config, context=context)
32+
self.config = config
3233

3334
def start(self) -> None:
3435
try:
3536
logger.info("Starting sglang-router-new on port %s...", self.context.port)
3637

37-
# Determine active venv (blue or green)
38-
version_file = DSTACK_DIR_ON_GATEWAY / "version"
39-
if version_file.exists():
40-
version = version_file.read_text().strip()
41-
else:
42-
version = "blue"
43-
44-
venv_python = DSTACK_DIR_ON_GATEWAY / version / "bin" / "python3"
45-
38+
# Prometheus port is offset by 10000 from router port to keep it in a separate range
4639
prometheus_port = self.context.port + 10000
4740

4841
cmd = [
49-
str(venv_python),
42+
sys.executable,
5043
"-m",
5144
"sglang_router.launch_router",
5245
"--host",
53-
"0.0.0.0",
46+
self.context.host,
5447
"--port",
5548
str(self.context.port),
5649
"--prometheus-port",
5750
str(prometheus_port),
51+
"--prometheus-host",
52+
self.context.host,
5853
"--log-level",
5954
self.context.log_level,
6055
"--log-dir",
6156
str(self.context.log_dir),
57+
"--policy",
58+
self.config.policy,
6259
]
6360

64-
if hasattr(self.config, "policy") and self.config.policy:
65-
cmd.extend(["--policy", self.config.policy])
66-
6761
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
6862

6963
time.sleep(2)
@@ -129,12 +123,9 @@ def stop(self) -> None:
129123
def is_running(self) -> bool:
130124
"""Check if the SGLang router is running and responding to HTTP requests on the assigned port."""
131125
try:
132-
result = subprocess.run(
133-
["curl", "-s", f"http://{self.context.host}:{self.context.port}/workers"],
134-
capture_output=True,
135-
timeout=5,
136-
)
137-
return result.returncode == 0
126+
with httpx.Client(timeout=5.0) as client:
127+
response = client.get(f"http://{self.context.host}:{self.context.port}/workers")
128+
return response.status_code == 200
138129
except Exception as e:
139130
logger.error(f"Error checking sglang router status on port {self.context.port}: {e}")
140131
return False
@@ -151,8 +142,11 @@ def update_replicas(self, replica_urls: List[str]) -> None:
151142
for worker in current_workers:
152143
url = worker.get("url")
153144
if url and isinstance(url, str):
154-
current_worker_urls.add(url)
155-
target_worker_urls = set(replica_urls)
145+
# Normalize URL by removing trailing slashes to avoid path artifacts
146+
normalized_url = url.rstrip("/")
147+
current_worker_urls.add(normalized_url)
148+
# Normalize target URLs to ensure consistent comparison
149+
target_worker_urls = {url.rstrip("/") for url in replica_urls}
156150

157151
# Workers to add
158152
workers_to_add = target_worker_urls - current_worker_urls
@@ -186,87 +180,76 @@ def update_replicas(self, replica_urls: List[str]) -> None:
186180

187181
def _get_router_workers(self) -> List[dict]:
188182
try:
189-
result = subprocess.run(
190-
["curl", "-s", f"http://{self.context.host}:{self.context.port}/workers"],
191-
capture_output=True,
192-
timeout=5,
193-
)
194-
if result.returncode == 0:
195-
response = json.loads(result.stdout.decode())
196-
workers = response.get("workers", [])
197-
return workers
198-
return []
183+
with httpx.Client(timeout=5.0) as client:
184+
response = client.get(f"http://{self.context.host}:{self.context.port}/workers")
185+
if response.status_code == 200:
186+
response_data = response.json()
187+
workers = response_data.get("workers", [])
188+
return workers
189+
return []
199190
except Exception as e:
200191
logger.error(f"Error getting sglang router workers: {e}")
201192
return []
202193

203194
def _add_worker_to_router(self, worker_url: str) -> bool:
204195
try:
205196
payload = {"url": worker_url, "worker_type": "regular"}
206-
result = subprocess.run(
207-
[
208-
"curl",
209-
"-X",
210-
"POST",
197+
with httpx.Client(timeout=5.0) as client:
198+
response = client.post(
211199
f"http://{self.context.host}:{self.context.port}/workers",
212-
"-H",
213-
"Content-Type: application/json",
214-
"-d",
215-
json.dumps(payload),
216-
],
217-
capture_output=True,
218-
timeout=5,
219-
)
220-
221-
if result.returncode == 0:
222-
response = json.loads(result.stdout.decode())
223-
if response.get("status") == "accepted":
224-
logger.info(
225-
"Added worker %s to sglang router on port %s",
200+
json=payload,
201+
)
202+
if response.status_code == 200:
203+
response_data = response.json()
204+
if response_data.get("status") == "accepted":
205+
logger.info(
206+
"Added worker %s to sglang router on port %s",
207+
worker_url,
208+
self.context.port,
209+
)
210+
return True
211+
else:
212+
logger.error("Failed to add worker %s: %s", worker_url, response_data)
213+
return False
214+
else:
215+
logger.error(
216+
"Failed to add worker %s: status %d, %s",
226217
worker_url,
227-
self.context.port,
218+
response.status_code,
219+
response.text,
228220
)
229-
return True
230-
else:
231-
logger.error("Failed to add worker %s: %s", worker_url, response)
232221
return False
233-
else:
234-
logger.error("Failed to add worker %s: %s", worker_url, result.stderr.decode())
235-
return False
236222
except Exception as e:
237223
logger.error(f"Error adding worker {worker_url}: {e}")
238224
return False
239225

240226
def _remove_worker_from_router(self, worker_url: str) -> bool:
241227
try:
242228
encoded_url = urllib.parse.quote(worker_url, safe="")
243-
244-
result = subprocess.run(
245-
[
246-
"curl",
247-
"-X",
248-
"DELETE",
249-
f"http://{self.context.host}:{self.context.port}/workers/{encoded_url}",
250-
],
251-
capture_output=True,
252-
timeout=5,
253-
)
254-
255-
if result.returncode == 0:
256-
response = json.loads(result.stdout.decode())
257-
if response.get("status") == "accepted":
258-
logger.info(
259-
"Removed worker %s from sglang router on port %s",
229+
with httpx.Client(timeout=5.0) as client:
230+
response = client.delete(
231+
f"http://{self.context.host}:{self.context.port}/workers/{encoded_url}"
232+
)
233+
if response.status_code == 200:
234+
response_data = response.json()
235+
if response_data.get("status") == "accepted":
236+
logger.info(
237+
"Removed worker %s from sglang router on port %s",
238+
worker_url,
239+
self.context.port,
240+
)
241+
return True
242+
else:
243+
logger.error("Failed to remove worker %s: %s", worker_url, response_data)
244+
return False
245+
else:
246+
logger.error(
247+
"Failed to remove worker %s: status %d, %s",
260248
worker_url,
261-
self.context.port,
249+
response.status_code,
250+
response.text,
262251
)
263-
return True
264-
else:
265-
logger.error("Failed to remove worker %s: %s", worker_url, response)
266252
return False
267-
else:
268-
logger.error("Failed to remove worker %s: %s", worker_url, result.stderr.decode())
269-
return False
270253
except Exception as e:
271254
logger.error(f"Error removing worker {worker_url}: {e}")
272255
return False

0 commit comments

Comments
 (0)