diff --git a/clients/python/setup.py b/clients/python/setup.py index 52d9e447e..152ae5102 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -5,5 +5,5 @@ python_requires=">=3.8", version="0.0.0.beta45", packages=find_packages(), - package_data={"llmengine": ["py.typed"]}, + package_data={"llmengine": ["py.typed"]}, # type: ignore[arg-type] ) diff --git a/model-engine/model_engine_server/core/celery/celery_autoscaler.py b/model-engine/model_engine_server/core/celery/celery_autoscaler.py index 923244a66..ca1498c07 100644 --- a/model-engine/model_engine_server/core/celery/celery_autoscaler.py +++ b/model-engine/model_engine_server/core/celery/celery_autoscaler.py @@ -80,57 +80,69 @@ async def list_deployments(apps_api) -> Dict[Tuple[str, str], CeleryAutoscalerPa from model_engine_server.common.config import hmi_config endpoint_namespace = hmi_config.endpoint_namespace + # Also scan "default" so non-launch celery deployments (e.g. nucleus workers) are still + # autoscaled. Previously the autoscaler scanned every namespace; #770 scoped it to a single + # namespace for startup speed but inadvertently dropped these. + # Dedupe to avoid double-scanning if endpoint_namespace itself is "default". + namespaces_to_scan = list(dict.fromkeys([endpoint_namespace, "default"])) celery_deployments_params = {} - namespace_start_time = time.time() - deployments = await apps_api.list_namespaced_deployment(namespace=endpoint_namespace) - logger.info( - f"list_namespaced_deployment in {endpoint_namespace} took {time.time() - namespace_start_time} seconds" - ) - for deployment in deployments.items: - deployment_name = deployment.metadata.name - annotations = deployment.metadata.annotations - - if not annotations: + for namespace_name in namespaces_to_scan: + namespace_start_time = time.time() + try: + deployments = await apps_api.list_namespaced_deployment(namespace=namespace_name) + except ApiException as exc: + # Don't let a failure in one namespace (e.g. missing RBAC) wipe out scaling for the + # other. Log and move on; the next iteration of the outer loop will retry. + logger.error(f"Failed to list deployments in namespace {namespace_name}: {exc}") continue + logger.info( + f"list_namespaced_deployment in {namespace_name} took {time.time() - namespace_start_time} seconds" + ) + for deployment in deployments.items: + deployment_name = deployment.metadata.name + annotations = deployment.metadata.annotations - # Parse parameters - params = {} + if not annotations: + continue - if "celery.scaleml.autoscaler/broker" in annotations: - deployment_broker = annotations["celery.scaleml.autoscaler/broker"] - else: - deployment_broker = ELASTICACHE_REDIS_BROKER + # Parse parameters + params = {} - if deployment_broker != autoscaler_broker: - logger.debug( - f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}" - ) - continue + if "celery.scaleml.autoscaler/broker" in annotations: + deployment_broker = annotations["celery.scaleml.autoscaler/broker"] + else: + deployment_broker = ELASTICACHE_REDIS_BROKER - for f in dataclasses.fields(CeleryAutoscalerParams): - k = f.name - v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}") - if not v: + if deployment_broker != autoscaler_broker: + logger.debug( + f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}" + ) continue - try: - if k == "task_visibility": - v = TaskVisibility.from_name(v) - v = f.type(v) - except (ValueError, KeyError): - logger.exception(f"Unable to convert {f.name}: {v} to {f.type}") + for f in dataclasses.fields(CeleryAutoscalerParams): + k = f.name + v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}") + if not v: + continue - params[k] = v + try: + if k == "task_visibility": + v = TaskVisibility.from_name(v) + v = f.type(v) + except (ValueError, KeyError): + logger.exception(f"Unable to convert {f.name}: {v} to {f.type}") - try: - celery_autoscaler_params = CeleryAutoscalerParams(**params) - except TypeError: - logger.debug( - f"Missing params, skipping deployment : {deployment_name} in {endpoint_namespace}" - ) - continue + params[k] = v + + try: + celery_autoscaler_params = CeleryAutoscalerParams(**params) + except TypeError: + logger.debug( + f"Missing params, skipping deployment : {deployment_name} in {namespace_name}" + ) + continue - celery_deployments_params[(deployment_name, endpoint_namespace)] = celery_autoscaler_params + celery_deployments_params[(deployment_name, namespace_name)] = celery_autoscaler_params return celery_deployments_params diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py new file mode 100644 index 000000000..a411202d4 --- /dev/null +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py @@ -0,0 +1,31 @@ +"""add temporal_task_queue column + +Revision ID: b2c3d4e5f6g7 +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-24 00:00:00.000000 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'b2c3d4e5f6g7' +down_revision = 'a1b2c3d4e5f6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'endpoints', + sa.Column('temporal_task_queue', sa.String, nullable=True), + schema='hosted_model_inference', + ) + + +def downgrade() -> None: + op.drop_column( + 'endpoints', + 'temporal_task_queue', + schema='hosted_model_inference', + )