Skip to content

Commit b278b1a

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Test sglang router per service implementation
1 parent 1206531 commit b278b1a

File tree

2 files changed

+131
-18
lines changed

2 files changed

+131
-18
lines changed

src/dstack/_internal/proxy/gateway/model_routers/sglang_new.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ def __init__(self, router: SGLangNewRouterConfig, context: Optional[RouterContex
3131
self._domain: Optional[str] = None # domain for this router instance
3232
self._replica_urls: List[str] = [] # List of worker URLs registered with this router
3333
self._domain_to_ports: Dict[str, List[int]] = {} # domain -> allocated worker ports
34-
self._next_worker_port: int = (
35-
12000 # Starting port for worker endpoints (avoid router port range 10001-11999)
36-
)
34+
self._next_worker_port: int = 10001 # Starting port for worker endpoints (managed globally by Nginx, range 10001-11999)
3735

3836
def start(self) -> None:
3937
"""Start the SGLang router process."""

src/dstack/_internal/proxy/gateway/services/nginx.py

Lines changed: 130 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from dstack._internal.core.models.routers import AnyRouterConfig
1414
from dstack._internal.proxy.gateway.const import PROXY_PORT_ON_GATEWAY
1515
from dstack._internal.proxy.gateway.model_routers import (
16+
Replica,
1617
Router,
1718
RouterContext,
1819
get_router,
@@ -90,7 +91,16 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None:
9091
# For sglang_new: 1:1 service-to-router mapping
9192
self._router_port_to_domain: Dict[int, str] = {} # router_port -> domain
9293
self._domain_to_router: Dict[str, Router] = {} # domain -> router instance
93-
self._next_router_port: int = 10001 # Starting port for routers (10001-11999)
94+
# Fixed port ranges (avoiding ephemeral ports which typically start at 32768)
95+
self._ROUTER_PORT_MIN: int = 20000 # Router port range: 20000-24999
96+
self._ROUTER_PORT_MAX: int = 24999
97+
self._WORKER_PORT_MIN: int = 10001 # Worker port range: 10001-11999
98+
self._WORKER_PORT_MAX: int = 11999
99+
self._next_router_port: int = self._ROUTER_PORT_MIN
100+
# Global tracking of worker ports to avoid conflicts across router instances
101+
self._allocated_worker_ports: set[int] = set() # Set of all allocated worker ports
102+
self._domain_to_worker_ports: Dict[str, list[int]] = {} # domain -> list of worker ports
103+
self._next_worker_port: int = self._WORKER_PORT_MIN
94104

95105
async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
96106
logger.debug("Registering %s domain %s", conf.type, conf.domain)
@@ -135,31 +145,67 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
135145
if not await run_async(router.is_running):
136146
await run_async(router.start)
137147

148+
# Free old worker ports if domain already has allocated ports (e.g., scaling replicas)
149+
if conf.domain in self._domain_to_worker_ports:
150+
old_worker_ports = self._domain_to_worker_ports[conf.domain]
151+
for port in old_worker_ports:
152+
self._allocated_worker_ports.discard(port)
153+
logger.debug(
154+
"Freed old worker ports %s for domain %s (scaling replicas)",
155+
old_worker_ports,
156+
conf.domain,
157+
)
158+
159+
# Allocate worker ports globally to avoid conflicts across router instances
160+
allocated_ports = self._allocate_worker_ports(len(conf.replicas))
161+
# Track worker ports for this domain
162+
self._domain_to_worker_ports[conf.domain] = allocated_ports
163+
138164
# Register replicas (no model_id needed for new sglang implementation)
139-
# This allocates worker ports and returns Replica objects
165+
# Pass pre-allocated ports to router
140166
replicas = await run_async(
141167
router.register_replicas,
142168
conf.domain,
143169
len(conf.replicas),
144170
None, # model_id not required for new sglang implementation
145171
)
146172

147-
# Extract allocated worker ports from replicas
148-
allocated_ports = [int(r.url.rsplit(":", 1)[-1]) for r in replicas]
173+
# Update replicas with the globally allocated ports
174+
# (router may have allocated different ports, so we override)
175+
replicas = [
176+
Replica(url=f"http://{router.context.host}:{port}", model=r.model)
177+
for r, port in zip(replicas, allocated_ports)
178+
]
149179

150180
# Write router workers config
151181
try:
152182
if conf.replicas:
153183
await run_async(self.write_router_workers_conf, conf, allocated_ports)
154184
except Exception as e:
185+
# Free allocated worker ports on error
186+
for port in allocated_ports:
187+
self._allocated_worker_ports.discard(port)
188+
if conf.domain in self._domain_to_worker_ports:
189+
del self._domain_to_worker_ports[conf.domain]
155190
logger.exception(
156191
"write_router_workers_conf failed for domain=%s: %s", conf.domain, e
157192
)
158193
raise
159194

160195
# Add replicas to router (actual HTTP API calls to add workers)
161196
# For new sglang implementation, we add workers with their allocated ports
162-
await run_async(router.add_replicas, replicas)
197+
try:
198+
await run_async(router.add_replicas, replicas)
199+
except Exception as e:
200+
# Free allocated worker ports on error
201+
for port in allocated_ports:
202+
self._allocated_worker_ports.discard(port)
203+
if conf.domain in self._domain_to_worker_ports:
204+
del self._domain_to_worker_ports[conf.domain]
205+
logger.exception(
206+
"Failed to add replicas to router for domain=%s: %s", conf.domain, e
207+
)
208+
raise
163209

164210
# Handle legacy sglang router type (shared router with IGW) - deprecated
165211
elif conf.router.type == "sglang_deprecated" and conf.model_id:
@@ -217,6 +263,15 @@ async def unregister(self, domain: str) -> None:
217263
if router_port in self._router_port_to_domain:
218264
del self._router_port_to_domain[router_port]
219265
del self._domain_to_router[domain]
266+
267+
# Free up worker ports for this domain
268+
if domain in self._domain_to_worker_ports:
269+
worker_ports = self._domain_to_worker_ports[domain]
270+
for port in worker_ports:
271+
self._allocated_worker_ports.discard(port)
272+
del self._domain_to_worker_ports[domain]
273+
logger.debug("Freed worker ports %s for domain %s", worker_ports, domain)
274+
220275
# Remove workers config file
221276
workers_conf_path = self._conf_dir / f"router-workers.{domain}.conf"
222277
if workers_conf_path.exists():
@@ -320,44 +375,104 @@ def _is_port_available(port: int) -> bool:
320375
return False
321376

322377
def _allocate_router_port(self) -> int:
323-
"""Allocate next available router port in range 10001-11999.
378+
"""Allocate next available router port in fixed range (20000-24999).
324379
325380
Checks both our internal allocation map and actual port availability
326-
to avoid conflicts with other services (e.g., Prometheus).
381+
to avoid conflicts with other services. Range chosen to avoid ephemeral ports.
327382
"""
328383
port = self._next_router_port
329-
max_attempts = 1999 # Maximum ports in range 10001-11999
384+
max_attempts = self._ROUTER_PORT_MAX - self._ROUTER_PORT_MIN + 1
330385
attempts = 0
331386

332387
while attempts < max_attempts:
333388
# Check if port is already allocated by us
334389
if port in self._router_port_to_domain:
335390
port += 1
336-
if port > 11999:
337-
port = 10001 # Wrap around
391+
if port > self._ROUTER_PORT_MAX:
392+
port = self._ROUTER_PORT_MIN # Wrap around
338393
attempts += 1
339394
continue
340395

341396
# Check if port is actually available on the system
342397
if self._is_port_available(port):
343398
# Port is available, allocate it
344399
self._next_router_port = port + 1
345-
if self._next_router_port > 11999:
346-
self._next_router_port = 10001 # Wrap around
400+
if self._next_router_port > self._ROUTER_PORT_MAX:
401+
self._next_router_port = self._ROUTER_PORT_MIN # Wrap around
347402
logger.debug("Allocated router port %s", port)
348403
return port
349404

350405
# Port is in use, try next one
351406
logger.debug("Port %s is in use, trying next port", port)
352407
port += 1
353-
if port > 11999:
354-
port = 10001 # Wrap around
408+
if port > self._ROUTER_PORT_MAX:
409+
port = self._ROUTER_PORT_MIN # Wrap around
355410
attempts += 1
356411

357412
raise UnexpectedProxyError(
358-
"Router port range exhausted (10001-11999). All ports in range appear to be in use."
413+
f"Router port range exhausted ({self._ROUTER_PORT_MIN}-{self._ROUTER_PORT_MAX}). "
414+
"All ports in range appear to be in use."
359415
)
360416

417+
def _allocate_worker_ports(self, num_ports: int) -> list[int]:
418+
"""Allocate worker ports globally in fixed range (10001-11999).
419+
420+
Worker ports are used by nginx to listen and proxy to worker sockets.
421+
They must be unique across all router instances. Range chosen to avoid ephemeral ports.
422+
423+
Args:
424+
num_ports: Number of worker ports to allocate
425+
426+
Returns:
427+
List of allocated worker port numbers
428+
"""
429+
allocated = []
430+
port = self._next_worker_port
431+
max_attempts = (self._WORKER_PORT_MAX - self._WORKER_PORT_MIN + 1) * 2 # Allow wrap-around
432+
attempts = 0
433+
434+
while len(allocated) < num_ports and attempts < max_attempts:
435+
# Check if port is already allocated globally
436+
if port in self._allocated_worker_ports:
437+
port += 1
438+
if port > self._WORKER_PORT_MAX:
439+
port = self._WORKER_PORT_MIN # Wrap around
440+
attempts += 1
441+
continue
442+
443+
# Check if port is actually available on the system
444+
if self._is_port_available(port):
445+
allocated.append(port)
446+
self._allocated_worker_ports.add(port)
447+
logger.debug("Allocated worker port %s", port)
448+
port += 1
449+
if port > self._WORKER_PORT_MAX:
450+
port = self._WORKER_PORT_MIN # Wrap around
451+
else:
452+
logger.debug("Worker port %s is in use, trying next port", port)
453+
port += 1
454+
if port > self._WORKER_PORT_MAX:
455+
port = self._WORKER_PORT_MIN # Wrap around
456+
457+
attempts += 1
458+
459+
if len(allocated) < num_ports:
460+
# Free up the ports we did allocate
461+
for p in allocated:
462+
self._allocated_worker_ports.discard(p)
463+
raise UnexpectedProxyError(
464+
f"Failed to allocate {num_ports} worker ports in range "
465+
f"({self._WORKER_PORT_MIN}-{self._WORKER_PORT_MAX}). "
466+
f"Only allocated {len(allocated)} ports after {attempts} attempts."
467+
)
468+
469+
# Update next worker port for next allocation
470+
self._next_worker_port = port
471+
if self._next_worker_port > self._WORKER_PORT_MAX:
472+
self._next_worker_port = self._WORKER_PORT_MIN # Wrap around
473+
474+
return allocated
475+
361476
def write_global_conf(self) -> None:
362477
conf = read_package_resource("00-log-format.conf")
363478
self.write_conf(conf, "00-log-format.conf")

0 commit comments

Comments
 (0)