From c2512139f15063df25c2d2f6bd6c2b1683b2408b Mon Sep 17 00:00:00 2001 From: Maya Krishnan Date: Tue, 12 May 2026 19:08:11 -0400 Subject: [PATCH 1/2] Also scan 'default' namespace in celery autoscaler (#828) * Also scan 'default' namespace in celery autoscaler PR #770 scoped the autoscaler's deployment scan to a single namespace (hmi_config.endpoint_namespace, i.e. 'scale-deploy') for startup speed. This inadvertently broke autoscaling for non-launch celery deployments that live in the 'default' namespace, e.g. nucleus-embed-image-clip-continuous-sqs, which use the same celery autoscaler annotations but are not model-engine async endpoints. Add 'default' as a second namespace to scan, restoring the previous behavior for those deployments while keeping the startup-speed win. A follow-up could make this configurable via env var; hardcoding for now to keep this change small and surgical. Co-Authored-By: Claude Opus 4.7 (1M context) * Pin types-setuptools below 82.0.0.20260508 to fix mypy CI The 2026-05-08 release of types-setuptools tightened the type for `package_data` in ways that fail strict mypy on clients/python/setup.py. Pin to the previous compatible version range. mypy --install-types spawns its own `pip install`, so add PIP_CONSTRAINT pointing at requirements-dev.txt so the pin is honored for transitive deps too (types-pyOpenSSL pulls in types-cffi which otherwise upgrades types-setuptools). Co-Authored-By: Claude Opus 4.7 (1M context) * Revert "Pin types-setuptools below 82.0.0.20260508 to fix mypy CI" This reverts commit 439555a6802209f671452fb732caf0411c453d62. * Address review: harden per-namespace errors, dedupe, ignore stub regression - celery_autoscaler: wrap list_namespaced_deployment in try/except ApiException per namespace so a failure in one (e.g. missing RBAC in "default") doesn't silence autoscaling for the other. - celery_autoscaler: dedupe namespaces_to_scan via dict.fromkeys in case hmi_config.endpoint_namespace is "default" in a dev/test env. - clients/python/setup.py: add `# type: ignore[arg-type]` for package_data to work around a type stub regression in types-setuptools 82.0.0.20260508. Co-Authored-By: Claude Opus 4.7 (1M context) * Fix black formatting on logger.error call --------- Co-authored-by: Claude Opus 4.7 (1M context) --- clients/python/setup.py | 2 +- .../core/celery/celery_autoscaler.py | 92 +++++++++++-------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/clients/python/setup.py b/clients/python/setup.py index 52d9e447e..152ae5102 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -5,5 +5,5 @@ python_requires=">=3.8", version="0.0.0.beta45", packages=find_packages(), - package_data={"llmengine": ["py.typed"]}, + package_data={"llmengine": ["py.typed"]}, # type: ignore[arg-type] ) diff --git a/model-engine/model_engine_server/core/celery/celery_autoscaler.py b/model-engine/model_engine_server/core/celery/celery_autoscaler.py index 923244a66..ca1498c07 100644 --- a/model-engine/model_engine_server/core/celery/celery_autoscaler.py +++ b/model-engine/model_engine_server/core/celery/celery_autoscaler.py @@ -80,57 +80,69 @@ async def list_deployments(apps_api) -> Dict[Tuple[str, str], CeleryAutoscalerPa from model_engine_server.common.config import hmi_config endpoint_namespace = hmi_config.endpoint_namespace + # Also scan "default" so non-launch celery deployments (e.g. nucleus workers) are still + # autoscaled. Previously the autoscaler scanned every namespace; #770 scoped it to a single + # namespace for startup speed but inadvertently dropped these. + # Dedupe to avoid double-scanning if endpoint_namespace itself is "default". + namespaces_to_scan = list(dict.fromkeys([endpoint_namespace, "default"])) celery_deployments_params = {} - namespace_start_time = time.time() - deployments = await apps_api.list_namespaced_deployment(namespace=endpoint_namespace) - logger.info( - f"list_namespaced_deployment in {endpoint_namespace} took {time.time() - namespace_start_time} seconds" - ) - for deployment in deployments.items: - deployment_name = deployment.metadata.name - annotations = deployment.metadata.annotations - - if not annotations: + for namespace_name in namespaces_to_scan: + namespace_start_time = time.time() + try: + deployments = await apps_api.list_namespaced_deployment(namespace=namespace_name) + except ApiException as exc: + # Don't let a failure in one namespace (e.g. missing RBAC) wipe out scaling for the + # other. Log and move on; the next iteration of the outer loop will retry. + logger.error(f"Failed to list deployments in namespace {namespace_name}: {exc}") continue + logger.info( + f"list_namespaced_deployment in {namespace_name} took {time.time() - namespace_start_time} seconds" + ) + for deployment in deployments.items: + deployment_name = deployment.metadata.name + annotations = deployment.metadata.annotations - # Parse parameters - params = {} + if not annotations: + continue - if "celery.scaleml.autoscaler/broker" in annotations: - deployment_broker = annotations["celery.scaleml.autoscaler/broker"] - else: - deployment_broker = ELASTICACHE_REDIS_BROKER + # Parse parameters + params = {} - if deployment_broker != autoscaler_broker: - logger.debug( - f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}" - ) - continue + if "celery.scaleml.autoscaler/broker" in annotations: + deployment_broker = annotations["celery.scaleml.autoscaler/broker"] + else: + deployment_broker = ELASTICACHE_REDIS_BROKER - for f in dataclasses.fields(CeleryAutoscalerParams): - k = f.name - v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}") - if not v: + if deployment_broker != autoscaler_broker: + logger.debug( + f"Skipping deployment {deployment_name}; deployment's broker {deployment_broker} is not {autoscaler_broker}" + ) continue - try: - if k == "task_visibility": - v = TaskVisibility.from_name(v) - v = f.type(v) - except (ValueError, KeyError): - logger.exception(f"Unable to convert {f.name}: {v} to {f.type}") + for f in dataclasses.fields(CeleryAutoscalerParams): + k = f.name + v = annotations.get(f"celery.scaleml.autoscaler/{stringcase.camelcase(k)}") + if not v: + continue - params[k] = v + try: + if k == "task_visibility": + v = TaskVisibility.from_name(v) + v = f.type(v) + except (ValueError, KeyError): + logger.exception(f"Unable to convert {f.name}: {v} to {f.type}") - try: - celery_autoscaler_params = CeleryAutoscalerParams(**params) - except TypeError: - logger.debug( - f"Missing params, skipping deployment : {deployment_name} in {endpoint_namespace}" - ) - continue + params[k] = v + + try: + celery_autoscaler_params = CeleryAutoscalerParams(**params) + except TypeError: + logger.debug( + f"Missing params, skipping deployment : {deployment_name} in {namespace_name}" + ) + continue - celery_deployments_params[(deployment_name, endpoint_namespace)] = celery_autoscaler_params + celery_deployments_params[(deployment_name, namespace_name)] = celery_autoscaler_params return celery_deployments_params From 48ad144f3348ced531352069384c39312b118ee6 Mon Sep 17 00:00:00 2001 From: Maya Krishnan Date: Wed, 13 May 2026 12:58:52 +0000 Subject: [PATCH 2/2] Cherry-pick temporal_task_queue migration to unblock deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prod DB currently has migration b2c3d4e5f6g7 applied, from lilyz-ai/temporal-endpoint-type branch (commit 04729cef, not yet on main). Any deploy from a base that doesn't include this migration fails the alembic pre-upgrade hook with: alembic.util.exc.CommandError: Can't locate revision identified by 'b2c3d4e5f6g7' Bring just the migration file in (the rest of the temporal feature code is intentionally NOT cherry-picked here). The migration adds a single nullable column temporal_task_queue on the endpoints table. Since the column is nullable and our ORM model doesn't reference it, this is purely a schema-acknowledgement change — no behavior impact on the autoscaler fix in c2512139. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...d4e5f6g7_add_temporal_task_queue_column.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py new file mode 100644 index 000000000..a411202d4 --- /dev/null +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py @@ -0,0 +1,31 @@ +"""add temporal_task_queue column + +Revision ID: b2c3d4e5f6g7 +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-24 00:00:00.000000 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'b2c3d4e5f6g7' +down_revision = 'a1b2c3d4e5f6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'endpoints', + sa.Column('temporal_task_queue', sa.String, nullable=True), + schema='hosted_model_inference', + ) + + +def downgrade() -> None: + op.drop_column( + 'endpoints', + 'temporal_task_queue', + schema='hosted_model_inference', + )