Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
python_requires=">=3.8",
version="0.0.0.beta45",
packages=find_packages(),
package_data={"llmengine": ["py.typed"]},
package_data={"llmengine": ["py.typed"]}, # type: ignore[arg-type]
)
92 changes: 52 additions & 40 deletions model-engine/model_engine_server/core/celery/celery_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,69 @@ async def list_deployments(apps_api) -> Dict[Tuple[str, str], CeleryAutoscalerPa
from model_engine_server.common.config import hmi_config

endpoint_namespace = hmi_config.endpoint_namespace
# Also scan "default" so non-launch celery deployments (e.g. nucleus workers) are still
# autoscaled. Previously the autoscaler scanned every namespace; #770 scoped it to a single
# namespace for startup speed but inadvertently dropped these.
# Dedupe to avoid double-scanning if endpoint_namespace itself is "default".
namespaces_to_scan = list(dict.fromkeys([endpoint_namespace, "default"]))
celery_deployments_params = {}
namespace_start_time = time.time()
deployments = await apps_api.list_namespaced_deployment(namespace=endpoint_namespace)
logger.info(
f"list_namespaced_deployment in {endpoint_namespace} took {time.time() - namespace_start_time} seconds"
)
for deployment in deployments.items:
deployment_name = deployment.metadata.name
annotations = deployment.metadata.annotations

if not annotations:
for namespace_name in namespaces_to_scan:
namespace_start_time = time.time()
try:
deployments = await apps_api.list_namespaced_deployment(namespace=namespace_name)
except ApiException as exc:
# Don't let a failure in one namespace (e.g. missing RBAC) wipe out scaling for the
# other. Log and move on; the next iteration of the outer loop will retry.
logger.error(f"Failed to list deployments in namespace {namespace_name}: {exc}")
continue
logger.info(
f"list_namespaced_deployment in {namespace_name} took {time.time() - namespace_start_time} seconds"
)
for deployment in deployments.items:
deployment_name = deployment.metadata.name
annotations = deployment.metadata.annotations

# Parse parameters
params = {}
if not annotations:
continue

if "celery.scaleml.autoscaler/broker" in annotations:
deployment_broker = annotations["celery.scaleml.autoscaler/broker"]
else:
deployment_broker = ELASTICACHE_REDIS_BROKER
# Parse parameters
params = {}

if deployment_broker != autoscaler_broker:
logger.debug(
f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}"
)
continue
if "celery.scaleml.autoscaler/broker" in annotations:
deployment_broker = annotations["celery.scaleml.autoscaler/broker"]
else:
deployment_broker = ELASTICACHE_REDIS_BROKER

for f in dataclasses.fields(CeleryAutoscalerParams):
k = f.name
v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}")
if not v:
if deployment_broker != autoscaler_broker:
logger.debug(
f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}"
)
continue

try:
if k == "task_visibility":
v = TaskVisibility.from_name(v)
v = f.type(v)
except (ValueError, KeyError):
logger.exception(f"Unable to convert {f.name}: {v} to {f.type}")
for f in dataclasses.fields(CeleryAutoscalerParams):
k = f.name
v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}")
if not v:
continue

params[k] = v
try:
if k == "task_visibility":
v = TaskVisibility.from_name(v)
v = f.type(v)
except (ValueError, KeyError):
logger.exception(f"Unable to convert {f.name}: {v} to {f.type}")

try:
celery_autoscaler_params = CeleryAutoscalerParams(**params)
except TypeError:
logger.debug(
f"Missing params, skipping deployment : {deployment_name} in {endpoint_namespace}"
)
continue
params[k] = v

try:
celery_autoscaler_params = CeleryAutoscalerParams(**params)
except TypeError:
logger.debug(
f"Missing params, skipping deployment : {deployment_name} in {namespace_name}"
)
continue

celery_deployments_params[(deployment_name, endpoint_namespace)] = celery_autoscaler_params
celery_deployments_params[(deployment_name, namespace_name)] = celery_autoscaler_params

return celery_deployments_params

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add temporal_task_queue column

Revision ID: b2c3d4e5f6g7
Revises: a1b2c3d4e5f6
Create Date: 2026-04-24 00:00:00.000000

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'b2c3d4e5f6g7'
down_revision = 'a1b2c3d4e5f6'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
'endpoints',
sa.Column('temporal_task_queue', sa.String, nullable=True),
schema='hosted_model_inference',
)


def downgrade() -> None:
op.drop_column(
'endpoints',
'temporal_task_queue',
schema='hosted_model_inference',
)