From 26ea71dbf7d2b0fa23a4dd3cc65c362d9cba2d66 Mon Sep 17 00:00:00 2001 From: fededagos Date: Mon, 1 Jun 2026 11:19:06 -0700 Subject: [PATCH 01/13] Add instances option to target specific fleet nodes Introduce an `instances` run profile option that pins a run to specific existing fleet instances (nodes). Each value matches an instance by its name (e.g. `my-fleet-0`) or by its hostname/IP address. When set, `filter_instances` keeps only matching instances and the job assignment phase never provisions new capacity to satisfy a node selector, terminating with a no-capacity error instead. --- src/dstack/_internal/core/models/profiles.py | 12 ++ .../pipeline_tasks/jobs_submitted.py | 15 +++ .../_internal/server/services/instances.py | 21 ++++ .../pipeline_tasks/test_submitted_jobs.py | 80 ++++++++++++++ .../_internal/server/routers/test_fleets.py | 3 + .../_internal/server/routers/test_runs.py | 4 + .../server/services/test_instances.py | 104 ++++++++++++++++++ 7 files changed, 239 insertions(+) diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index f1beebd5b9..9f1b454da4 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -391,6 +391,18 @@ class ProfileParams(CoreModel): ), ), ] = None + instances: Annotated[ + Optional[List[str]], + Field( + description=( + "The specific fleet instances (nodes) to consider for reuse." + " Each value matches an instance by its name (e.g. `my-fleet-0`)" + " or by its hostname/IP address." + " When set, the run is only placed on a matching existing instance" + " and no new instances are provisioned" + ), + ), + ] = None tags: Annotated[ Optional[Dict[str, str]], Field( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2869374d55..2de32faa6a 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -530,6 +530,11 @@ async def _select_assignment( volumes=preconditions.prepared_job_volumes.volumes, ) + if context.run.run_spec.merged_profile.instances: + # The run targets specific existing instances (nodes). Do not provision new + # capacity to satisfy a node selector that no available instance matches. + return _NoFleetAssignment() + return _NewCapacityAssignment(fleet_id=fleet_model.id) @@ -905,6 +910,16 @@ async def _apply_no_fleet_selection( job_model: JobModel, run: Run, ) -> None: + if run.run_spec.merged_profile.instances: + logger.debug("%s: failed to use specified instances", fmt(job_model)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Failed to use specified instances", + ) + return + if run.run_spec.merged_profile.fleets is not None: logger.debug("%s: failed to use specified fleets", fmt(job_model)) await _terminate_submitted_job( diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index ad48ff1f51..0cc9512110 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -375,6 +375,22 @@ def get_instance_ssh_private_keys(instance_model: InstanceModel) -> tuple[str, O return host_private_key, proxy_private_keys[0] +def instance_matches_selectors(instance: InstanceModel, selectors: List[str]) -> bool: + """ + Check if an instance matches any of the given node selectors. + A selector matches the instance name or its hostname/IP address + (cloud public IP or SSH host). + """ + candidates = {instance.name.lower()} + jpd = get_instance_provisioning_data(instance) + if jpd is not None and jpd.hostname is not None: + candidates.add(jpd.hostname.lower()) + rci = get_instance_remote_connection_info(instance) + if rci is not None: + candidates.add(rci.host.lower()) + return any(selector.lower() in candidates for selector in selectors) + + def instance_matches_constraints( instance: InstanceModel, *, @@ -462,11 +478,16 @@ def filter_instances( regions = [r for r in regions if r == master_job_provisioning_data.region] instance_types = profile.instance_types + instance_selectors = profile.instances filtered_instances: List[InstanceModel] = [] for instance in instances: if instance.unreachable: continue + if instance_selectors is not None and not instance_matches_selectors( + instance, instance_selectors + ): + continue if instance.health.is_failure(): continue if status is not None and instance.status != status: diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index fd9cf3b58a..ee56ae33cb 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -1165,6 +1165,86 @@ async def test_assigns_job_to_specific_fleet( assert job.instance is not None and job.instance.id == instance_2.id assert job.fleet_id == fleet_2.id + async def test_assigns_job_to_specific_instance( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=0, + name="my-fleet-0", + ) + instance_1 = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=1, + name="my-fleet-1", + ) + run_spec = get_run_spec( + repo_id=repo.name, + profile=Profile(name="default", instances=["my-fleet-1"]), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job(session=session, run=run) + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.instance_assigned + assert job.instance is not None and job.instance.id == instance_1.id + assert job.fleet_id == fleet.id + + async def test_does_not_provision_new_capacity_when_instances_specified( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + # A candidate fleet exists, but no instance matches the selector. + await create_fleet(session=session, project=project) + run_spec = get_run_spec( + repo_id=repo.name, + profile=Profile(name="default", instances=["missing-instance"]), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + await session.refresh(job) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + # No placeholder instance should be created for a specific-instance target. + res = await session.execute(select(InstanceModel)) + assert res.scalars().all() == [] + async def test_assignment_creates_placeholder_instance_for_new_capacity( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 16ce066866..6e231e74bf 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -973,6 +973,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -1093,6 +1094,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -1312,6 +1314,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index a46ec93f8d..dc108ce514 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -213,6 +213,7 @@ def get_dev_env_run_plan_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, "priority": 0, @@ -239,6 +240,7 @@ def get_dev_env_run_plan_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -458,6 +460,7 @@ def get_dev_env_run_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, "priority": 0, @@ -484,6 +487,7 @@ def get_dev_env_run_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index 1f5e1fb52b..166857e58b 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -26,7 +26,9 @@ create_repo, create_run, create_user, + get_job_provisioning_data, get_kubernetes_volume_configuration, + get_remote_connection_info, get_volume, get_volume_configuration, get_volume_provisioning_data, @@ -202,6 +204,108 @@ async def test_returns_volume_instances_with_az(self, test_db, session: AsyncSes ) assert res == [aws_instance_2] + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + instance_num=0, + name="my-cluster-0", + ) + instance1 = await create_instance( + session=session, + project=project, + instance_num=1, + name="my-cluster-1", + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=["my-cluster-1"]), + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_case_insensitive(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + ) + res = instances_services.filter_instances( + instances=[instance0], + profile=Profile(name="test", instances=["MY-CLUSTER-0"]), + ) + assert res == [instance0] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_hostname(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.7"), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.8"), + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=["10.0.0.8"]), + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_ssh_host(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + remote_connection_info=get_remote_connection_info(host="192.168.1.10"), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + remote_connection_info=get_remote_connection_info(host="192.168.1.11"), + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=["192.168.1.11"]), + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_no_instances_selector_returns_all(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance(session=session, project=project, name="my-cluster-0") + instance1 = await create_instance(session=session, project=project, name="my-cluster-1") + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=None), + ) + assert res == instances + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_volume_instances_without_region(self, test_db, session: AsyncSession): From 7bf972476b561ddfaccac96001f5c58daf5a1b3d Mon Sep 17 00:00:00 2001 From: fededagos Date: Mon, 1 Jun 2026 11:19:13 -0700 Subject: [PATCH 02/13] Validate node count and hide unusable offers for instances option Reject runs that target fewer instances than the number of nodes they require, surfaced during planning via `validate_run_spec_and_set_defaults`. Exclude new-capacity backend offers from the run plan when `instances` is set, since they are never provisioned and would otherwise mislead the `dstack apply`/`dstack offer` output. --- .../_internal/server/services/runs/plan.py | 4 +- .../_internal/server/services/runs/spec.py | 9 ++++ .../server/services/runs/test_plan.py | 49 +++++++++++++++++++ .../server/services/runs/test_spec.py | 39 ++++++++++++++- 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 942bd1302b..23610cdd44 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -810,7 +810,9 @@ def _get_job_plan( ) -> JobPlan: job_offers: list[InstanceOfferWithAvailability] = [] job_offers.extend(offer for _, offer in instance_offers) - if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE: + # When the run targets specific instances, new capacity is never provisioned, + # so backend offers are not actually usable and must not be shown in the plan. + if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE and not profile.instances: job_offers.extend(offer for _, offer in backend_offers) job_offers.sort(key=lambda offer: not offer.availability.is_available()) remove_job_spec_sensitive_info(job.job_spec) diff --git a/src/dstack/_internal/server/services/runs/spec.py b/src/dstack/_internal/server/services/runs/spec.py index cb989ef5b4..2a22e7162f 100644 --- a/src/dstack/_internal/server/services/runs/spec.py +++ b/src/dstack/_internal/server/services/runs/spec.py @@ -115,6 +115,15 @@ def validate_run_spec_and_set_defaults( raise ServerClientError( f"Probe timeout cannot be longer than {settings.MAX_PROBE_TIMEOUT}s" ) + instances = run_spec.merged_profile.instances + if instances is not None: + nodes_required_num = get_nodes_required_num(run_spec) + if len(instances) < nodes_required_num: + raise ServerClientError( + f"`instances` specifies {len(instances)} instance(s)" + f" but the run requires {nodes_required_num} nodes." + " Specify at least as many instances as nodes." + ) if run_spec.configuration.priority is None: run_spec.configuration.priority = RUN_PRIORITY_DEFAULT # We do not reject top-level `resources` when `replicas` is a list. Adding strict checks diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 5836319bc1..04f4e5210b 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -7,11 +7,13 @@ from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceAvailability +from dstack._internal.core.models.profiles import CreationPolicy, Profile from dstack._internal.server.services.jobs import get_jobs_from_run_spec from dstack._internal.server.services.runs.plan import ( _freeze_offer_identity_value, _get_backend_offer_identity, _get_backend_offers_in_fleet, + _get_job_plan, ) from dstack._internal.server.testing.common import ( create_fleet, @@ -66,6 +68,53 @@ def test_get_backend_offer_identity_uses_full_offer_payload(self) -> None: assert _get_backend_offer_identity(offer) != _get_backend_offer_identity(different_offer) +class TestGetJobPlan: + @pytest.mark.asyncio + async def test_includes_backend_offers_by_default(self) -> None: + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + instance_offer = get_instance_offer_with_availability() + backend_offer = get_instance_offer_with_availability() + + job_plan = _get_job_plan( + instance_offers=[(None, instance_offer)], + backend_offers=[(None, backend_offer)], + profile=Profile(name="default", creation_policy=CreationPolicy.REUSE_OR_CREATE), + job=jobs[0], + max_offers=None, + ) + + assert job_plan.total_offers == 2 + + @pytest.mark.asyncio + async def test_excludes_backend_offers_when_instances_specified(self) -> None: + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + instance_offer = get_instance_offer_with_availability() + backend_offer = get_instance_offer_with_availability() + + job_plan = _get_job_plan( + instance_offers=[(None, instance_offer)], + backend_offers=[(None, backend_offer)], + profile=Profile( + name="default", + creation_policy=CreationPolicy.REUSE_OR_CREATE, + instances=["my-fleet-0"], + ), + job=jobs[0], + max_offers=None, + ) + + assert job_plan.total_offers == 1 + assert job_plan.offers == [instance_offer] + + class TestGetBackendOffersInFleet: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) diff --git a/src/tests/_internal/server/services/runs/test_spec.py b/src/tests/_internal/server/services/runs/test_spec.py index 093ca768cf..71a449ed91 100644 --- a/src/tests/_internal/server/services/runs/test_spec.py +++ b/src/tests/_internal/server/services/runs/test_spec.py @@ -5,7 +5,11 @@ import pytest from dstack._internal.core.errors import ServerClientError -from dstack._internal.core.models.configurations import ServiceConfiguration +from dstack._internal.core.models.configurations import ( + DevEnvironmentConfiguration, + ServiceConfiguration, + TaskConfiguration, +) from dstack._internal.core.models.files import FileArchiveMapping from dstack._internal.core.models.profiles import Profile, ProfileRetry from dstack._internal.core.models.repos.local import LocalRunRepoData @@ -98,6 +102,39 @@ def test_rejects_negative_retry_duration_for_new_run_specs(self): ) +class TestValidateRunSpecInstances: + def _user(self): + return SimpleNamespace(ssh_public_key="ssh-rsa test") + + def test_rejects_fewer_instances_than_nodes(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(commands=["echo"], nodes=2), + profile=Profile(name="default", instances=["my-fleet-0"]), + ) + + with pytest.raises(ServerClientError, match="instances"): + validate_run_spec_and_set_defaults(self._user(), run_spec) + + def test_allows_matching_instances_and_nodes(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(commands=["echo"], nodes=2), + profile=Profile(name="default", instances=["my-fleet-0", "my-fleet-1"]), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + + def test_allows_single_node_with_instances(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=DevEnvironmentConfiguration(ide="vscode"), + profile=Profile(name="default", instances=["my-fleet-3"]), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + + class TestCheckCanUpdateConfigurationRouterType: def test_sglang_to_dynamo_router_type_change_is_rejected(self): current = _run_spec(_service_configuration(router_type="sglang")) From d35215315831839b8e43f42bc5622e253ef66c92 Mon Sep 17 00:00:00 2001 From: fededagos Date: Mon, 1 Jun 2026 11:19:22 -0700 Subject: [PATCH 03/13] Document targeting specific fleet instances Add a 'Targeting specific instances' section to the shared fleets snippet (dev environments, tasks, services) and a corresponding tip in the protips guide. --- .../docs/concepts/snippets/manage-fleets.ext | 20 +++++++++++ mkdocs/docs/guides/protips.md | 33 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/mkdocs/docs/concepts/snippets/manage-fleets.ext b/mkdocs/docs/concepts/snippets/manage-fleets.ext index b30b4126a8..59635b9789 100644 --- a/mkdocs/docs/concepts/snippets/manage-fleets.ext +++ b/mkdocs/docs/concepts/snippets/manage-fleets.ext @@ -1,3 +1,23 @@ +### Targeting specific instances + +If a fleet has multiple nodes and you want the run to land on a particular node, set `instances` +in the run configuration. Each value matches a fleet instance by its name (e.g. `my-fleet-0`) +or by its hostname/IP address: + +
+ +```yaml +# Only consider these specific fleet instances (nodes) +instances: + - my-fleet-3 +``` + +
+ +When `instances` is set, the run is only placed on a matching existing instance and `dstack` +never provisions new instances. If no matching instance is available, the run fails with a +no-capacity error (use `retry` to wait for a targeted node to free up). + ### Idle duration If the run is submitted to a fleet with `nodes` set to a range and a new instance is provisioned, diff --git a/mkdocs/docs/guides/protips.md b/mkdocs/docs/guides/protips.md index 065556ef11..bfcaabb6e6 100644 --- a/mkdocs/docs/guides/protips.md +++ b/mkdocs/docs/guides/protips.md @@ -208,6 +208,39 @@ $ dstack apply -R -f examples/.dstack.yml Or, set [`creation_policy`](../reference/dstack.yml/dev-environment.md#creation_policy) to `reuse` in the run configuration. +### Targeting specific instances + +If you have a fleet with multiple nodes and want a run to land on a particular node, +set `instances` in the run configuration. Each value matches an instance by its name +(e.g. `my-fleet-0`) or by its hostname/IP address: + +
+ +```yaml +type: dev-environment +name: vscode +ide: vscode + +# Only consider these specific fleet instances (nodes) +instances: + - my-fleet-3 +``` + +
+ +You can list multiple instances and mix names and IP addresses: + +```yaml +instances: + - my-fleet-3 + - 203.0.113.10 +``` + +When `instances` is set, the run is only placed on a matching existing instance and +`dstack` never provisions new instances. If no matching instance is available, the run +fails with a no-capacity error (use [`retry`](../reference/dstack.yml/dev-environment.md#retry) +to wait for a targeted node to free up). + ### Idle duration If the run is submitted to a fleet with `nodes` set to a range and a new instance is provisioned, the shorter of the fleet's and run's `idle_duration` is used. From c28a418c5161245acc3d379ba978d1215543d606 Mon Sep 17 00:00:00 2001 From: fededagos Date: Mon, 1 Jun 2026 12:02:39 -0700 Subject: [PATCH 04/13] Treat empty instances list as targeting and cover multinode/shared paths Handle an explicit empty `instances` list consistently across the assignment gate, plan output, and instance filtering by checking `is not None` instead of truthiness, so an empty list targets existing instances only (rather than silently allowing new-capacity provisioning and showing unusable offers). Add regression tests ensuring the instance selector is applied on the multinode and shared-instances filter paths. --- .../pipeline_tasks/jobs_submitted.py | 5 +- .../_internal/server/services/runs/plan.py | 3 +- .../server/services/runs/test_plan.py | 27 ++++++++++ .../server/services/test_instances.py | 50 +++++++++++++++++++ 4 files changed, 82 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2de32faa6a..2b6486d517 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -530,9 +530,10 @@ async def _select_assignment( volumes=preconditions.prepared_job_volumes.volumes, ) - if context.run.run_spec.merged_profile.instances: + if context.run.run_spec.merged_profile.instances is not None: # The run targets specific existing instances (nodes). Do not provision new # capacity to satisfy a node selector that no available instance matches. + # `is not None` (not truthiness) so an empty list is also treated as targeting. return _NoFleetAssignment() return _NewCapacityAssignment(fleet_id=fleet_model.id) @@ -910,7 +911,7 @@ async def _apply_no_fleet_selection( job_model: JobModel, run: Run, ) -> None: - if run.run_spec.merged_profile.instances: + if run.run_spec.merged_profile.instances is not None: logger.debug("%s: failed to use specified instances", fmt(job_model)) await _terminate_submitted_job( session=session, diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 23610cdd44..28dd3d47d5 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -812,7 +812,8 @@ def _get_job_plan( job_offers.extend(offer for _, offer in instance_offers) # When the run targets specific instances, new capacity is never provisioned, # so backend offers are not actually usable and must not be shown in the plan. - if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE and not profile.instances: + # `instances is not None` (not truthiness) so an empty list is also treated as targeting. + if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE and profile.instances is None: job_offers.extend(offer for _, offer in backend_offers) job_offers.sort(key=lambda offer: not offer.availability.is_available()) remove_job_spec_sensitive_info(job.job_spec) diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 04f4e5210b..2a78da55dd 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -114,6 +114,33 @@ async def test_excludes_backend_offers_when_instances_specified(self) -> None: assert job_plan.total_offers == 1 assert job_plan.offers == [instance_offer] + @pytest.mark.asyncio + async def test_excludes_backend_offers_when_instances_empty_list(self) -> None: + # An explicit empty `instances` list must be treated the same as a non-empty + # selector (target existing instances only), not as "no targeting". + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + instance_offer = get_instance_offer_with_availability() + backend_offer = get_instance_offer_with_availability() + + job_plan = _get_job_plan( + instance_offers=[(None, instance_offer)], + backend_offers=[(None, backend_offer)], + profile=Profile( + name="default", + creation_policy=CreationPolicy.REUSE_OR_CREATE, + instances=[], + ), + job=jobs[0], + max_offers=None, + ) + + assert job_plan.total_offers == 1 + assert job_plan.offers == [instance_offer] + class TestGetBackendOffersInFleet: @pytest.mark.asyncio diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index 166857e58b..b561110fda 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -306,6 +306,56 @@ async def test_no_instances_selector_returns_all(self, test_db, session: AsyncSe ) assert res == instances + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_for_multinode(self, test_db, session: AsyncSession): + # Regression: the selector must also be applied on the multinode filter path. + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + backend=BackendType.AWS, + name="my-fleet-0", + ) + instance1 = await create_instance( + session=session, + project=project, + backend=BackendType.AWS, + name="my-fleet-1", + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile(name="test", instances=["my-fleet-1"]), + multinode=True, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_for_shared(self, test_db, session: AsyncSession): + # Regression: the selector must also be applied on the shared-instances filter path. + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-fleet-0", + total_blocks=2, + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-fleet-1", + total_blocks=2, + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile(name="test", instances=["my-fleet-1"]), + shared=True, + ) + assert res == [instance1] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_volume_instances_without_region(self, test_db, session: AsyncSession): From b25e68ab08ee19094e1b8bc1a63fafd3b6d87fca Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Fri, 5 Jun 2026 18:17:43 +0200 Subject: [PATCH 05/13] Support strict instance selectors --- .../docs/concepts/snippets/manage-fleets.ext | 25 ++- mkdocs/docs/guides/protips.md | 29 +-- .../_internal/core/compatibility/common.py | 2 + .../_internal/core/compatibility/runs.py | 2 + src/dstack/_internal/core/models/profiles.py | 58 +++++- .../pipeline_tasks/jobs_submitted.py | 51 ++++- .../_internal/server/services/instances.py | 115 ++++++++++- .../_internal/server/services/runs/plan.py | 145 ++++++++++--- .../_internal/core/compatibility/test_runs.py | 66 ++++++ .../_internal/core/models/test_profiles.py | 56 ++++- .../pipeline_tasks/test_submitted_jobs.py | 68 +++++- .../server/services/runs/test_plan.py | 195 +++++++++++++++--- .../server/services/runs/test_spec.py | 20 +- .../server/services/test_instances.py | 159 +++++++++++++- 14 files changed, 873 insertions(+), 118 deletions(-) create mode 100644 src/tests/_internal/core/compatibility/test_runs.py diff --git a/mkdocs/docs/concepts/snippets/manage-fleets.ext b/mkdocs/docs/concepts/snippets/manage-fleets.ext index 59635b9789..5680e497d7 100644 --- a/mkdocs/docs/concepts/snippets/manage-fleets.ext +++ b/mkdocs/docs/concepts/snippets/manage-fleets.ext @@ -1,22 +1,33 @@ -### Targeting specific instances +### Instances -If a fleet has multiple nodes and you want the run to land on a particular node, set `instances` -in the run configuration. Each value matches a fleet instance by its name (e.g. `my-fleet-0`) -or by its hostname/IP address: +Use `instances` to restrict a run to particular existing fleet instances. You can specify +instance names, hostnames or IP addresses, or a fleet name with an instance number. +For the `fleet` form, use `/` when the fleet belongs to another project:
```yaml -# Only consider these specific fleet instances (nodes) instances: - - my-fleet-3 + - name: my-fleet-3 + - hostname: 203.0.113.10 + - fleet: my-fleet + instance: 3 ```
+??? info "Short syntax" + + To match by instance name, you can use the string shorthand: + + ```yaml + instances: + - my-fleet-3 + ``` + When `instances` is set, the run is only placed on a matching existing instance and `dstack` never provisions new instances. If no matching instance is available, the run fails with a -no-capacity error (use `retry` to wait for a targeted node to free up). +no-capacity error (use `retry` to wait for a targeted instance to free up). ### Idle duration diff --git a/mkdocs/docs/guides/protips.md b/mkdocs/docs/guides/protips.md index bfcaabb6e6..51b20d4f4b 100644 --- a/mkdocs/docs/guides/protips.md +++ b/mkdocs/docs/guides/protips.md @@ -208,11 +208,11 @@ $ dstack apply -R -f examples/.dstack.yml Or, set [`creation_policy`](../reference/dstack.yml/dev-environment.md#creation_policy) to `reuse` in the run configuration. -### Targeting specific instances +### Instances -If you have a fleet with multiple nodes and want a run to land on a particular node, -set `instances` in the run configuration. Each value matches an instance by its name -(e.g. `my-fleet-0`) or by its hostname/IP address: +Use `instances` to restrict a run to particular existing fleet instances. You can specify +instance names, hostnames or IP addresses, or a fleet name with an instance number. +For the `fleet` form, use `/` when the fleet belongs to another project:
@@ -221,25 +221,28 @@ type: dev-environment name: vscode ide: vscode -# Only consider these specific fleet instances (nodes) instances: - - my-fleet-3 + - name: my-fleet-3 + - hostname: 203.0.113.10 + - fleet: my-fleet + instance: 3 ```
-You can list multiple instances and mix names and IP addresses: +??? info "Short syntax" -```yaml -instances: - - my-fleet-3 - - 203.0.113.10 -``` + To match by instance name, you can use the string shorthand: + + ```yaml + instances: + - my-fleet-3 + ``` When `instances` is set, the run is only placed on a matching existing instance and `dstack` never provisions new instances. If no matching instance is available, the run fails with a no-capacity error (use [`retry`](../reference/dstack.yml/dev-environment.md#retry) -to wait for a targeted node to free up). +to wait for a targeted instance to free up). ### Idle duration diff --git a/src/dstack/_internal/core/compatibility/common.py b/src/dstack/_internal/core/compatibility/common.py index 789e2d120a..8a34b40579 100644 --- a/src/dstack/_internal/core/compatibility/common.py +++ b/src/dstack/_internal/core/compatibility/common.py @@ -10,6 +10,8 @@ def get_profile_excludes(profile: Optional[ProfileParams]) -> IncludeExcludeSetT return excludes if profile.backend_options is None: excludes.add("backend_options") + if profile.instances is None: + excludes.add("instances") return excludes diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index cbed73c5bf..6013a940e2 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -83,6 +83,8 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType: spec_excludes: IncludeExcludeDictType = {} configuration_excludes: IncludeExcludeDictType = {} profile_excludes = get_profile_excludes(run_spec.profile) + for field in get_profile_excludes(run_spec.configuration): + configuration_excludes[field] = True if run_spec.configuration.backend_options is None: configuration_excludes["backend_options"] = True diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 9f1b454da4..befb4a6c14 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -234,6 +234,46 @@ def crons(self) -> List[str]: return self.cron +class InstanceNameSelector(CoreModel): + name: Annotated[str, Field(description="The fleet instance name", min_length=1)] + + +class InstanceHostnameSelector(CoreModel): + hostname: Annotated[ + str, Field(description="The fleet instance hostname or IP address", min_length=1) + ] + + +def _validate_fleet_instance_selector_fleet(v: str) -> str: + EntityReference.parse(v) + return v + + +class FleetInstanceSelector(CoreModel): + fleet: Annotated[ + str, + Field( + description=( + "The fleet name. For fleets owned by the current project, specify the fleet name." + " For a fleet from another project, specify `/`" + ), + min_length=1, + ), + ] + instance: Annotated[int, Field(description="The fleet instance number", ge=0)] + + _validate_fleet = validator("fleet", allow_reuse=True)(_validate_fleet_instance_selector_fleet) + + +InstanceSelector = Union[InstanceNameSelector, InstanceHostnameSelector, FleetInstanceSelector] + + +def parse_instance_selector(v: Union[InstanceSelector, str]) -> InstanceSelector: + if isinstance(v, str): + return InstanceNameSelector(name=v) + return v + + class ProfileParamsConfig(CoreConfig): @staticmethod def schema_extra(schema: Dict[str, Any]): @@ -249,6 +289,10 @@ def schema_extra(schema: Dict[str, Any]): schema["properties"]["idle_duration"], extra_types=[{"type": "string"}], ) + add_extra_schema_types( + schema["properties"]["instances"]["items"], + extra_types=[{"type": "string", "minLength": 1}], + ) class ProfileParams(CoreModel): @@ -387,20 +431,21 @@ class ProfileParams(CoreModel): description=( "The fleets considered for reuse." " For fleets owned by the current project, specify fleet names." - " For imported fleets, specify `/`" + " For fleets from another project, specify `/`" ), ), ] = None instances: Annotated[ - Optional[List[str]], + Optional[List[InstanceSelector]], Field( description=( - "The specific fleet instances (nodes) to consider for reuse." - " Each value matches an instance by its name (e.g. `my-fleet-0`)" - " or by its hostname/IP address." + "The specific fleet instances to consider for reuse." + " Each value can be an instance name string (e.g. `my-fleet-0`)" + " or an object with `name`, `hostname`, or `fleet` and `instance`." " When set, the run is only placed on a matching existing instance" " and no new instances are provisioned" ), + min_items=1, ), ] = None tags: Annotated[ @@ -428,6 +473,9 @@ class ProfileParams(CoreModel): parse_idle_duration ) _validate_fleets = validator("fleets", allow_reuse=True, each_item=True)(EntityReference.parse) + _validate_instances = validator("instances", pre=True, allow_reuse=True, each_item=True)( + parse_instance_selector + ) _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) _validate_backend_options = validator("backend_options", allow_reuse=True)( validate_backend_options diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2b6486d517..7decdd5900 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -7,6 +7,7 @@ from typing import Optional, Sequence, Union from sqlalchemy import and_, func, or_, select, update +from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only, selectinload @@ -97,6 +98,7 @@ get_instance_offer, get_instance_provisioning_data, is_placeholder_instance, + profile_instances_have_qualified_fleet_selector, switch_instance_status, ) from dstack._internal.server.services.jobs import ( @@ -523,7 +525,11 @@ async def _select_assignment( if fleet_model is None: return _NoFleetAssignment() - if fleet_instances_with_offers: + required_instance_offers = 1 + if context.run.run_spec.merged_profile.instances is not None: + required_instance_offers = len(context.jobs_to_provision) + + if len(fleet_instances_with_offers) >= required_instance_offers: return _ExistingInstanceAssignment( fleet_id=fleet_model.id, master_job_provisioning_data=preconditions.master_job_provisioning_data, @@ -531,9 +537,8 @@ async def _select_assignment( ) if context.run.run_spec.merged_profile.instances is not None: - # The run targets specific existing instances (nodes). Do not provision new - # capacity to satisfy a node selector that no available instance matches. - # `is not None` (not truthiness) so an empty list is also treated as targeting. + # The run targets specific existing instances. Do not provision new capacity + # if no selected fleet has enough matching available instances. return _NoFleetAssignment() return _NewCapacityAssignment(fleet_id=fleet_model.id) @@ -649,10 +654,13 @@ async def _apply_assignment_result( assignment=assignment, fleet_model=fleet_model, ) - if not current_instance_offers: - # If the reusable offers vanished under the fleet lock, retry full - # assignment later instead of forcing new-capacity provisioning in a - # fleet that may no longer be optimal. + required_instance_offers = 1 + if context.run.run_spec.merged_profile.instances is not None: + required_instance_offers = len(context.jobs_to_provision) + if len(current_instance_offers) < required_instance_offers: + # If reusable offers vanished or are no longer enough under the fleet lock, + # retry full assignment later instead of forcing new-capacity provisioning + # in a fleet that may no longer be optimal. await _reset_job_lock_for_retry(session=session, item=item) return @@ -699,6 +707,9 @@ async def _load_submitted_job_context( ) job_model = res.unique().scalar_one() run = run_model_to_run(run_model) + if profile_instances_have_qualified_fleet_selector(run.run_spec.merged_profile): + await _load_fleet_project_if_needed(session=session, fleet_model=run_model.fleet) + await _load_fleet_project_if_needed(session=session, fleet_model=job_model.fleet) job = find_job(run.jobs, job_model.replica_num, job_model.job_num) replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) return _SubmittedJobContext( @@ -784,6 +795,20 @@ async def _fetch_run_model_for_submitted_job( return res.unique().scalar_one() +async def _load_fleet_project_if_needed( + session: AsyncSession, + fleet_model: Optional[FleetModel], +) -> None: + if fleet_model is None or "project" not in sa_inspect(fleet_model).unloaded: + return + await session.execute( + select(FleetModel) + .where(FleetModel.id == fleet_model.id) + .options(joinedload(FleetModel.project)) + .execution_options(populate_existing=True) + ) + + def _get_job_models_for_jobs( job_models: list[JobModel], jobs: list[Job], @@ -902,6 +927,9 @@ async def _load_assignment_candidate_fleets( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=False, + load_fleet_project=profile_instances_have_qualified_fleet_selector( + context.run.run_spec.merged_profile + ), ) return fleets_with_instances + fleets_without_instances @@ -965,6 +993,9 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=True, + load_fleet_project=profile_instances_have_qualified_fleet_selector( + context.run.run_spec.merged_profile + ), ) if not fleets_with_instances: return None @@ -988,6 +1019,9 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( fleet_filters=fleet_filters, instance_filters=[*instance_filters, InstanceModel.id.in_(instance_ids)], lock_instances=True, + load_fleet_project=profile_instances_have_qualified_fleet_selector( + context.run.run_spec.merged_profile + ), ) if not fleets_with_locked_instances: return None @@ -1063,6 +1097,7 @@ def _get_current_reusable_instance_offers( fleet_model: FleetModel, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: return get_instance_offers_in_fleet( + project=context.project, fleet_model=fleet_model, run_spec=context.run.run_spec, job=context.job, diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index 0cc9512110..cbb9e54ff4 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -6,6 +6,7 @@ import gpuhunt from sqlalchemy import and_, exists, false, or_, select +from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, load_only @@ -16,6 +17,7 @@ from dstack._internal.core.backends.features import BACKENDS_WITH_MULTINODE_SUPPORT from dstack._internal.core.errors import ResourceNotExistsError from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.common import EntityReference from dstack._internal.core.models.envs import Env from dstack._internal.core.models.health import HealthCheck, HealthEvent, HealthStatus from dstack._internal.core.models.instances import ( @@ -34,6 +36,10 @@ ) from dstack._internal.core.models.profiles import ( DEFAULT_FLEET_TERMINATION_IDLE_TIME, + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + InstanceSelector, Profile, TerminationPolicy, ) @@ -375,20 +381,99 @@ def get_instance_ssh_private_keys(instance_model: InstanceModel) -> tuple[str, O return host_private_key, proxy_private_keys[0] -def instance_matches_selectors(instance: InstanceModel, selectors: List[str]) -> bool: +def profile_instances_have_qualified_fleet_selector(profile: Profile) -> bool: + if profile.instances is None: + return False + for selector in profile.instances: + if isinstance(selector, FleetInstanceSelector): + if EntityReference.parse(selector.fleet).project is not None: + return True + return False + + +def instance_matches_selectors( + instance: InstanceModel, + selectors: List[InstanceSelector], + *, + project: Optional[ProjectModel] = None, + fleet: Optional[FleetModel] = None, +) -> bool: """ - Check if an instance matches any of the given node selectors. - A selector matches the instance name or its hostname/IP address - (cloud public IP or SSH host). + Check if an instance matches any of the given instance selectors. """ - candidates = {instance.name.lower()} + return any( + instance_matches_selector( + instance, + selector, + project=project, + fleet=fleet, + ) + for selector in selectors + ) + + +def instance_matches_selector( + instance: InstanceModel, + selector: InstanceSelector, + *, + project: Optional[ProjectModel] = None, + fleet: Optional[FleetModel] = None, +) -> bool: + if isinstance(selector, InstanceNameSelector): + return instance.name.lower() == selector.name.lower() + if isinstance(selector, InstanceHostnameSelector): + return instance_matches_hostname_selector(instance, selector) + if isinstance(selector, FleetInstanceSelector): + return instance_matches_fleet_instance_selector( + instance, + selector, + project=project, + fleet=fleet, + ) + return False + + +def instance_matches_hostname_selector( + instance: InstanceModel, selector: InstanceHostnameSelector +) -> bool: + candidates = set() jpd = get_instance_provisioning_data(instance) if jpd is not None and jpd.hostname is not None: candidates.add(jpd.hostname.lower()) rci = get_instance_remote_connection_info(instance) if rci is not None: candidates.add(rci.host.lower()) - return any(selector.lower() in candidates for selector in selectors) + return selector.hostname.lower() in candidates + + +def instance_matches_fleet_instance_selector( + instance: InstanceModel, + selector: FleetInstanceSelector, + *, + project: Optional[ProjectModel] = None, + fleet: Optional[FleetModel] = None, +) -> bool: + fleet_ref = EntityReference.parse(selector.fleet) + + if fleet is None: + # Avoid triggering a lazy load in async code. + if "fleet" in sa_inspect(instance).unloaded or instance.fleet is None: + return False + fleet = instance.fleet + + if fleet.name.lower() != fleet_ref.name.lower(): + return False + if instance.instance_num != selector.instance: + return False + + if fleet_ref.project is None: + if project is not None and fleet.project_id != project.id: + return False + return True + + if "project" in sa_inspect(fleet).unloaded or fleet.project is None: + return False + return fleet.project.name.lower() == fleet_ref.project.lower() def instance_matches_constraints( @@ -439,6 +524,8 @@ def filter_instances( master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[List[List[Volume]]] = None, shared: bool = False, + project: Optional[ProjectModel] = None, + fleet: Optional[FleetModel] = None, ) -> List[InstanceModel]: backend_types: Optional[list[BackendType]] = profile.backends regions: Optional[list[str]] = profile.regions @@ -485,7 +572,10 @@ def filter_instances( if instance.unreachable: continue if instance_selectors is not None and not instance_matches_selectors( - instance, instance_selectors + instance, + instance_selectors, + project=project, + fleet=fleet, ): continue if instance.health.is_failure(): @@ -529,6 +619,8 @@ def get_shared_instances_with_offers( idle_only: bool = False, multinode: bool = False, volumes: Optional[List[List[Volume]]] = None, + project: Optional[ProjectModel] = None, + fleet: Optional[FleetModel] = None, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] query_filter = requirements_to_query_filter(requirements) @@ -538,6 +630,8 @@ def get_shared_instances_with_offers( multinode=multinode, volumes=volumes, shared=True, + project=project, + fleet=fleet, ) for instance in filtered_instances: if idle_only and instance.status not in [InstanceStatus.IDLE, InstanceStatus.BUSY]: @@ -567,7 +661,12 @@ def get_shared_instances_with_offers( async def get_pool_instances( session: AsyncSession, project: ProjectModel, + *, + load_fleet_project: bool = False, ) -> List[InstanceModel]: + fleet_load = joinedload(InstanceModel.fleet) + if load_fleet_project: + fleet_load = fleet_load.joinedload(FleetModel.project) res = await session.execute( select(InstanceModel) .where( @@ -581,7 +680,7 @@ async def get_pool_instances( ), InstanceModel.deleted == False, ) - .options(joinedload(InstanceModel.fleet)) + .options(fleet_load) ) instance_models = list(res.unique().scalars().all()) return instance_models diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 28dd3d47d5..ed5780f440 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -16,7 +16,11 @@ InstanceOfferWithAvailability, InstanceStatus, ) -from dstack._internal.core.models.profiles import CreationPolicy, Profile +from dstack._internal.core.models.profiles import ( + CreationPolicy, + FleetInstanceSelector, + Profile, +) from dstack._internal.core.models.runs import ( Job, JobPlan, @@ -45,6 +49,7 @@ get_pool_instances, get_shared_instances_with_offers, is_placeholder_instance, + profile_instances_have_qualified_fleet_selector, ) from dstack._internal.server.services.jobs import ( get_instances_ids_with_detaching_volumes, @@ -210,44 +215,92 @@ async def get_run_candidate_fleet_models_filters( if run_spec.merged_profile.fleets is not None: fleet_conditions = [] for ref in map(EntityReference.parse, run_spec.merged_profile.fleets): - if ref.project is None: - fleet_conditions.append( - and_( - FleetModel.name == ref.name, - FleetModel.project_id == project.id, - ) - ) - else: - fleet_conditions.append( - and_( - FleetModel.name == ref.name, - ProjectModel.name == ref.project, - ) - ) + fleet_conditions.append(_get_fleet_reference_condition(project=project, ref=ref)) fleet_filters.append(or_(*fleet_conditions)) instance_filters = [ InstanceModel.deleted == False, InstanceModel.id.not_in(detaching_instances_ids), ] + fleet_instance_selectors = _get_only_fleet_instance_selectors(run_spec.merged_profile) + if fleet_instance_selectors is not None: + # Name and hostname selectors are checked after instances are loaded. + fleet_filters.append( + or_( + *[ + _get_fleet_reference_condition( + project=project, + ref=EntityReference.parse(selector.fleet), + ) + for selector in fleet_instance_selectors + ] + ) + ) + instance_filters.append( + or_( + *[ + and_( + _get_fleet_reference_condition( + project=project, + ref=EntityReference.parse(selector.fleet), + ), + InstanceModel.instance_num == selector.instance, + ) + for selector in fleet_instance_selectors + ] + ) + ) return fleet_filters, instance_filters +def _get_fleet_reference_condition(project: ProjectModel, ref: EntityReference): + if ref.project is None: + return and_( + FleetModel.name == ref.name, + FleetModel.project_id == project.id, + ) + return and_( + FleetModel.name == ref.name, + ProjectModel.name == ref.project, + ) + + +def _get_only_fleet_instance_selectors( + profile: Profile, +) -> Optional[list[FleetInstanceSelector]]: + if profile.instances is None: + return None + fleet_instance_selectors = [] + for selector in profile.instances: + if not isinstance(selector, FleetInstanceSelector): + return None + fleet_instance_selectors.append(selector) + if not fleet_instance_selectors: + return None + return fleet_instance_selectors + + async def select_run_candidate_fleet_models_with_filters( session: AsyncSession, fleet_filters: list, instance_filters: list, lock_instances: bool, + load_fleet_project: bool = False, ) -> tuple[list[FleetModel], list[FleetModel]]: # Selecting fleets in two queries since Postgres does not allow # locking nullable side of an outer join. So, first lock instances with inner join. # Then select left out fleets without instances. + with_instances_options = [contains_eager(FleetModel.instances)] + without_instances_options = [noload(FleetModel.instances)] + if load_fleet_project: + with_instances_options.append(contains_eager(FleetModel.project)) + without_instances_options.append(contains_eager(FleetModel.project)) stmt = ( select(FleetModel) .join(FleetModel.project) # can be referenced by fleet_filters .join(FleetModel.instances) .where(*fleet_filters) .where(*instance_filters) - .options(contains_eager(FleetModel.instances)) + .options(*with_instances_options) .execution_options(populate_existing=True) ) if lock_instances: @@ -274,7 +327,7 @@ async def select_run_candidate_fleet_models_with_filters( not_(and_(*instance_filters)), ) ) - .options(noload(FleetModel.instances)) + .options(*without_instances_options) .execution_options(populate_existing=True) ) fleet_models_without_instances = list(res.unique().scalars().all()) @@ -323,6 +376,7 @@ async def find_optimal_fleet_with_offers( if run_model is not None and run_model.fleet is not None: # Using the fleet that was already chosen by the master job instance_offers = get_instance_offers_in_fleet( + project=project, fleet_model=run_model.fleet, run_spec=run_spec, job=job, @@ -358,6 +412,7 @@ async def find_optimal_fleet_with_offers( continue all_instance_offers = get_instance_offers_in_fleet( + project=project, fleet_model=fleet_model, run_spec=run_spec, job=job, @@ -382,10 +437,12 @@ async def find_optimal_fleet_with_offers( ) ) - # If any candidate fleet has pool capacity, the optimal fleet will be one of - # those, so backend offers from any fleet won't affect selection — skip them entirely when allowed. - skip_backend_offers = skip_backend_offers_on_pool_capacity and any( - candidate.has_pool_capacity for candidate in candidates + # If `instances` is set, backend offers cannot satisfy the run. Otherwise, + # keep the existing optimization that skips backend requests when pool + # capacity is already enough. + skip_backend_offers = run_spec.merged_profile.instances is not None or ( + skip_backend_offers_on_pool_capacity + and any(candidate.has_pool_capacity for candidate in candidates) ) # Second step: gather backend offers unless skipped. @@ -478,6 +535,9 @@ async def _select_candidate_fleet_models( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=False, + load_fleet_project=profile_instances_have_qualified_fleet_selector( + run_spec.merged_profile + ), ) return fleet_models_with_instances + fleet_models_without_instances @@ -486,6 +546,8 @@ def get_instance_offers_in_fleet( fleet_model: FleetModel, run_spec: RunSpec, job: Job, + *, + project: Optional[ProjectModel] = None, master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, exclude_not_available: bool = False, @@ -500,6 +562,8 @@ def get_instance_offers_in_fleet( master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, shared=False, + project=project, + fleet=fleet_model, ) instances_with_offers = _get_offers_from_instances(nonshared_instances) shared_instances_with_offers = get_shared_instances_with_offers( @@ -508,6 +572,8 @@ def get_instance_offers_in_fleet( requirements=job.job_spec.requirements, multinode=multinode, volumes=volumes, + project=project, + fleet=fleet_model, ) instances_with_offers.extend(shared_instances_with_offers) instances_with_offers.sort(key=lambda o: o[0].price or 0) @@ -610,7 +676,13 @@ async def _get_pool_offers( ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: pool_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) - pool_instances = await get_pool_instances(session, project) + pool_instances = await get_pool_instances( + session, + project, + load_fleet_project=profile_instances_have_qualified_fleet_selector( + run_spec.merged_profile + ), + ) pool_instances = [i for i in pool_instances if i.id not in detaching_instances_ids] multinode = is_multinode_job(job) shared_instances_with_offers = get_shared_instances_with_offers( @@ -619,6 +691,7 @@ async def _get_pool_offers( requirements=job.job_spec.requirements, volumes=volumes, multinode=multinode, + project=project, ) for offer in shared_instances_with_offers: pool_offers.append(offer) @@ -630,6 +703,7 @@ async def _get_pool_offers( multinode=multinode, volumes=volumes, shared=False, + project=project, ) nonshared_instances_with_offers = _get_offers_from_instances(nonshared_instances) pool_offers.extend(nonshared_instances_with_offers) @@ -659,16 +733,18 @@ async def _get_non_fleet_offers( job=job, volumes=volumes, ) - backend_offers = await get_offers_by_requirements( - project=project, - profile=profile, - requirements=job.job_spec.requirements, - exclude_not_available=False, - multinode=is_multinode_job(job), - volumes=volumes, - privileged=job.job_spec.privileged, - instance_mounts=check_run_spec_requires_instance_mounts(run_spec), - ) + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]] = [] + if profile.instances is None: + backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=job.job_spec.requirements, + exclude_not_available=False, + multinode=is_multinode_job(job), + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + ) return instance_offers, backend_offers @@ -687,6 +763,9 @@ async def get_backend_offers_in_run_candidate_fleets( It resolves the selected fleets from `run_spec`, requests backend offers in each fleet, merges them, and deduplicates identical backend offers across fleets. """ + if run_spec.merged_profile.instances is not None: + return [] + candidate_fleet_models = await _select_candidate_fleet_models( session=session, project=project, @@ -741,6 +820,7 @@ async def _get_offers_in_run_candidate_fleets( for candidate_fleet_model in candidate_fleet_models: instance_offers.extend( get_instance_offers_in_fleet( + project=project, fleet_model=candidate_fleet_model, run_spec=run_spec, job=job, @@ -812,7 +892,6 @@ def _get_job_plan( job_offers.extend(offer for _, offer in instance_offers) # When the run targets specific instances, new capacity is never provisioned, # so backend offers are not actually usable and must not be shown in the plan. - # `instances is not None` (not truthiness) so an empty list is also treated as targeting. if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE and profile.instances is None: job_offers.extend(offer for _, offer in backend_offers) job_offers.sort(key=lambda offer: not offer.availability.is_available()) diff --git a/src/tests/_internal/core/compatibility/test_runs.py b/src/tests/_internal/core/compatibility/test_runs.py new file mode 100644 index 0000000000..516b15d512 --- /dev/null +++ b/src/tests/_internal/core/compatibility/test_runs.py @@ -0,0 +1,66 @@ +from dstack._internal.core.compatibility.fleets import get_fleet_spec_excludes +from dstack._internal.core.compatibility.runs import get_run_spec_excludes +from dstack._internal.core.models.configurations import TaskConfiguration +from dstack._internal.core.models.fleets import FleetConfiguration, FleetSpec +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile +from dstack._internal.core.models.runs import RunSpec + + +class TestGetRunSpecExcludes: + def test_excludes_unset_instances_for_old_servers(self): + run_spec = RunSpec( + configuration=TaskConfiguration(commands=["echo"]), + profile=Profile(name="default"), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert excludes["configuration"]["instances"] is True + assert "instances" in excludes["profile"] + + def test_keeps_configuration_instances_when_set(self): + run_spec = RunSpec( + configuration=TaskConfiguration( + commands=["echo"], + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + profile=Profile(name="default"), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert "instances" not in excludes["configuration"] + assert "instances" in excludes["profile"] + + def test_keeps_profile_instances_when_set(self): + run_spec = RunSpec( + configuration=TaskConfiguration(commands=["echo"]), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert excludes["configuration"]["instances"] is True + assert "instances" not in excludes["profile"] + + +class TestGetFleetSpecExcludes: + def test_excludes_unset_profile_instances_for_old_servers(self): + spec = FleetSpec(configuration=FleetConfiguration(), profile=Profile()) + + excludes = get_fleet_spec_excludes(spec) + + assert excludes is not None + assert "instances" in excludes["profile"] + + def test_keeps_profile_instances_when_set(self): + spec = FleetSpec(configuration=FleetConfiguration(), profile=Profile()) + spec.profile.instances = [InstanceNameSelector(name="my-fleet-0")] + + excludes = get_fleet_spec_excludes(spec) + + assert excludes is not None + assert "instances" not in excludes["profile"] diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py index 4a1caf8bf8..d7aaa7fc74 100644 --- a/src/tests/_internal/core/models/test_profiles.py +++ b/src/tests/_internal/core/models/test_profiles.py @@ -2,7 +2,12 @@ from pydantic import ValidationError from dstack._internal.core.backends.vastai.profile_options import VastAIProfileOptions -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.profiles import ( + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + Profile, +) class TestValidateProfileBackendOptions: @@ -27,3 +32,52 @@ def test_none_backend_options_is_valid(self): def test_empty_list_backend_options_is_valid(self): profile = Profile(backend_options=[]) assert profile.backend_options == [] + + +class TestProfileInstances: + def test_string_is_parsed_as_instance_name_selector(self): + profile = Profile.parse_obj({"instances": ["my-fleet-1"]}) + + assert profile.instances == [InstanceNameSelector(name="my-fleet-1")] + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ({"name": "my-fleet-1"}, InstanceNameSelector(name="my-fleet-1")), + ({"hostname": "worker-1"}, InstanceHostnameSelector(hostname="worker-1")), + ( + {"fleet": "my-fleet", "instance": 3}, + FleetInstanceSelector(fleet="my-fleet", instance=3), + ), + ( + {"fleet": "other-project/my-fleet", "instance": 3}, + FleetInstanceSelector(fleet="other-project/my-fleet", instance=3), + ), + ], + ) + def test_object_selectors_are_parsed(self, value, expected): + profile = Profile.parse_obj({"instances": [value]}) + + assert profile.instances == [expected] + + @pytest.mark.parametrize( + "value", + [ + "", + {"name": "my-fleet-1", "hostname": "worker-1"}, + {"name": ""}, + {"hostname": ""}, + {"fleet": "", "instance": 0}, + {"fleet": "project/name/extra", "instance": 0}, + {"fleet": "my-fleet"}, + {"fleet": "my-fleet", "instance": -1}, + {"hostname": "worker-1", "extra": "value"}, + ], + ) + def test_invalid_object_selector_is_rejected(self, value): + with pytest.raises(ValidationError): + Profile.parse_obj({"instances": [value]}) + + def test_empty_instances_list_is_rejected(self): + with pytest.raises(ValidationError): + Profile.parse_obj({"instances": []}) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index ee56ae33cb..2a22a4b8bd 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -17,7 +17,7 @@ from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceStatus from dstack._internal.core.models.placement import PlacementGroup -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile from dstack._internal.core.models.runs import JobStatus, JobTerminationReason from dstack._internal.core.models.users import GlobalRole from dstack._internal.core.models.volumes import ( @@ -1190,7 +1190,7 @@ async def test_assigns_job_to_specific_instance( ) run_spec = get_run_spec( repo_id=repo.name, - profile=Profile(name="default", instances=["my-fleet-1"]), + profile=Profile(name="default", instances=[InstanceNameSelector(name="my-fleet-1")]), ) run = await create_run( session=session, @@ -1208,6 +1208,65 @@ async def test_assigns_job_to_specific_instance( assert job.instance is not None and job.instance.id == instance_1.id assert job.fleet_id == fleet.id + async def test_does_not_assign_targeted_multinode_job_without_enough_instances( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=0, + name="my-fleet-0", + ) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", nodes=2), + profile=Profile( + name="default", + instances=[ + InstanceNameSelector(name="my-fleet-0"), + InstanceNameSelector(name="my-fleet-1"), + ], + ), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + master_job = await create_job( + session=session, + run=run, + job_num=0, + waiting_master_job=False, + ) + await create_job( + session=session, + run=run, + job_num=1, + waiting_master_job=True, + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + await session.refresh(instance) + assert master_job.status == JobStatus.TERMINATING + assert ( + master_job.termination_reason + == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + ) + assert not master_job.instance_assigned + assert master_job.instance is None + assert instance.status == InstanceStatus.IDLE + async def test_does_not_provision_new_capacity_when_instances_specified( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): @@ -1218,7 +1277,10 @@ async def test_does_not_provision_new_capacity_when_instances_specified( await create_fleet(session=session, project=project) run_spec = get_run_spec( repo_id=repo.name, - profile=Profile(name="default", instances=["missing-instance"]), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="missing-instance")], + ), ) run = await create_run( session=session, diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 2a78da55dd..7c3eb628d1 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -7,15 +7,25 @@ from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceAvailability -from dstack._internal.core.models.profiles import CreationPolicy, Profile +from dstack._internal.core.models.profiles import ( + CreationPolicy, + FleetInstanceSelector, + InstanceNameSelector, + Profile, +) from dstack._internal.server.services.jobs import get_jobs_from_run_spec from dstack._internal.server.services.runs.plan import ( _freeze_offer_identity_value, _get_backend_offer_identity, _get_backend_offers_in_fleet, _get_job_plan, + find_optimal_fleet_with_offers, + get_backend_offers_in_run_candidate_fleets, + get_run_candidate_fleet_models_filters, + select_run_candidate_fleet_models_with_filters, ) from dstack._internal.server.testing.common import ( + create_export, create_fleet, create_instance, create_project, @@ -105,34 +115,7 @@ async def test_excludes_backend_offers_when_instances_specified(self) -> None: profile=Profile( name="default", creation_policy=CreationPolicy.REUSE_OR_CREATE, - instances=["my-fleet-0"], - ), - job=jobs[0], - max_offers=None, - ) - - assert job_plan.total_offers == 1 - assert job_plan.offers == [instance_offer] - - @pytest.mark.asyncio - async def test_excludes_backend_offers_when_instances_empty_list(self) -> None: - # An explicit empty `instances` list must be treated the same as a non-empty - # selector (target existing instances only), not as "no targeting". - run_spec = get_run_spec( - repo_id="test-repo", - configuration=TaskConfiguration(image="debian", commands=["echo"]), - ) - jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) - instance_offer = get_instance_offer_with_availability() - backend_offer = get_instance_offer_with_availability() - - job_plan = _get_job_plan( - instance_offers=[(None, instance_offer)], - backend_offers=[(None, backend_offer)], - profile=Profile( - name="default", - creation_policy=CreationPolicy.REUSE_OR_CREATE, - instances=[], + instances=[InstanceNameSelector(name="my-fleet-0")], ), job=jobs[0], max_offers=None, @@ -189,3 +172,157 @@ async def test_keeps_unconstrained_offers_for_non_empty_cluster_fleet_without_el get_offers_by_requirements_mock.await_args.kwargs["master_job_provisioning_data"] is None ) + + +class TestSelectRunCandidateFleetModelsWithFilters: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ("selector", "expected_fleet_project_name"), + [ + ("same-fleet", "importer-project"), + ("exporter-project/same-fleet", "exporter-project"), + ], + ) + async def test_fleet_instance_selector_narrows_candidate_fleets( + self, + test_db, + session: AsyncSession, + selector: str, + expected_fleet_project_name: str, + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user, name="importer-project") + exporter_project = await create_project( + session=session, owner=user, name="exporter-project" + ) + local_fleet = await create_fleet(session=session, project=project, name="same-fleet") + exported_fleet = await create_fleet( + session=session, project=exporter_project, name="same-fleet" + ) + unrelated_fleet = await create_fleet( + session=session, project=project, name="unrelated-fleet" + ) + await create_instance( + session=session, + project=project, + fleet=local_fleet, + instance_num=1, + ) + await create_instance( + session=session, + project=exporter_project, + fleet=exported_fleet, + instance_num=1, + ) + await create_instance( + session=session, + project=project, + fleet=unrelated_fleet, + instance_num=1, + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[project], + exported_fleets=[exported_fleet], + ) + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[FleetInstanceSelector(fleet=selector, instance=1)]), + ) + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + session=session, + project=project, + run_model=None, + run_spec=run_spec, + ) + + ( + fleets_with_instances, + fleets_without_instances, + ) = await select_run_candidate_fleet_models_with_filters( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + lock_instances=False, + load_fleet_project=True, + ) + + assert [fleet.project.name for fleet in fleets_with_instances] == [ + expected_fleet_project_name + ] + assert fleets_without_instances == [] + + +class TestFindOptimalFleetWithOffers: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_skips_backend_offers_when_instances_specified( + self, test_db, session: AsyncSession, monkeypatch: pytest.MonkeyPatch + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[InstanceNameSelector(name="missing-instance")]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + get_backend_offers_in_fleet_mock = AsyncMock() + monkeypatch.setattr( + "dstack._internal.server.services.runs.plan._get_backend_offers_in_fleet", + get_backend_offers_in_fleet_mock, + ) + + fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers( + project=project, + fleet_models=[fleet], + run_model=None, + run_spec=run_spec, + job=jobs[0], + master_job_provisioning_data=None, + volumes=None, + exclude_not_available=False, + ) + + assert fleet_model == fleet + assert instance_offers == [] + assert backend_offers == [] + get_backend_offers_in_fleet_mock.assert_not_awaited() + + +class TestGetBackendOffersInRunCandidateFleets: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_skips_backend_offers_when_instances_specified( + self, test_db, session: AsyncSession, monkeypatch: pytest.MonkeyPatch + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[InstanceNameSelector(name="missing-instance")]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + select_candidate_fleet_models_mock = AsyncMock() + monkeypatch.setattr( + "dstack._internal.server.services.runs.plan._select_candidate_fleet_models", + select_candidate_fleet_models_mock, + ) + + offers = await get_backend_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=jobs[0], + volumes=None, + ) + + assert offers == [] + select_candidate_fleet_models_mock.assert_not_awaited() diff --git a/src/tests/_internal/server/services/runs/test_spec.py b/src/tests/_internal/server/services/runs/test_spec.py index 71a449ed91..08a662e78e 100644 --- a/src/tests/_internal/server/services/runs/test_spec.py +++ b/src/tests/_internal/server/services/runs/test_spec.py @@ -11,7 +11,7 @@ TaskConfiguration, ) from dstack._internal.core.models.files import FileArchiveMapping -from dstack._internal.core.models.profiles import Profile, ProfileRetry +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile, ProfileRetry from dstack._internal.core.models.repos.local import LocalRunRepoData from dstack._internal.core.models.runs import RunSpec from dstack._internal.server.services.runs.spec import ( @@ -110,7 +110,10 @@ def test_rejects_fewer_instances_than_nodes(self): run_spec = get_run_spec( repo_id="test-repo", configuration=TaskConfiguration(commands=["echo"], nodes=2), - profile=Profile(name="default", instances=["my-fleet-0"]), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), ) with pytest.raises(ServerClientError, match="instances"): @@ -120,7 +123,13 @@ def test_allows_matching_instances_and_nodes(self): run_spec = get_run_spec( repo_id="test-repo", configuration=TaskConfiguration(commands=["echo"], nodes=2), - profile=Profile(name="default", instances=["my-fleet-0", "my-fleet-1"]), + profile=Profile( + name="default", + instances=[ + InstanceNameSelector(name="my-fleet-0"), + InstanceNameSelector(name="my-fleet-1"), + ], + ), ) validate_run_spec_and_set_defaults(self._user(), run_spec) @@ -129,7 +138,10 @@ def test_allows_single_node_with_instances(self): run_spec = get_run_spec( repo_id="test-repo", configuration=DevEnvironmentConfiguration(ide="vscode"), - profile=Profile(name="default", instances=["my-fleet-3"]), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-3")], + ), ) validate_run_spec_and_set_defaults(self._user(), run_spec) diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index b561110fda..7192c17d69 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -2,6 +2,7 @@ from unittest.mock import Mock, call import pytest +from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession import dstack._internal.server.services.instances as instances_services @@ -14,12 +15,19 @@ InstanceType, Resources, ) -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.profiles import ( + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + Profile, +) from dstack._internal.core.models.runs import JobStatus from dstack._internal.server.models import InstanceModel from dstack._internal.server.schemas.runner import TaskListItem, TaskListResponse, TaskStatus from dstack._internal.server.services.runner.client import ShimClient from dstack._internal.server.testing.common import ( + create_export, + create_fleet, create_instance, create_job, create_project, @@ -224,7 +232,7 @@ async def test_filters_by_instance_name(self, test_db, session: AsyncSession): instances = [instance0, instance1] res = instances_services.filter_instances( instances=instances, - profile=Profile(name="test", instances=["my-cluster-1"]), + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-cluster-1")]), ) assert res == [instance1] @@ -240,7 +248,7 @@ async def test_filters_by_instance_name_case_insensitive(self, test_db, session: ) res = instances_services.filter_instances( instances=[instance0], - profile=Profile(name="test", instances=["MY-CLUSTER-0"]), + profile=Profile(name="test", instances=[InstanceNameSelector(name="MY-CLUSTER-0")]), ) assert res == [instance0] @@ -264,10 +272,30 @@ async def test_filters_by_hostname(self, test_db, session: AsyncSession): instances = [instance0, instance1] res = instances_services.filter_instances( instances=instances, - profile=Profile(name="test", instances=["10.0.0.8"]), + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="10.0.0.8")], + ), ) assert res == [instance1] + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_string_selector_does_not_match_hostname(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.8"), + ) + res = instances_services.filter_instances( + instances=[instance], + profile=Profile.parse_obj({"name": "test", "instances": ["10.0.0.8"]}), + ) + assert res == [] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_filters_by_ssh_host(self, test_db, session: AsyncSession): @@ -288,10 +316,127 @@ async def test_filters_by_ssh_host(self, test_db, session: AsyncSession): instances = [instance0, instance1] res = instances_services.filter_instances( instances=instances, - profile=Profile(name="test", instances=["192.168.1.11"]), + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="192.168.1.11")], + ), + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_fleet_and_instance_number(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + instance0 = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + name="worker-a", + ) + instance1 = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=1, + name="worker-b", + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile( + name="test", + instances=[FleetInstanceSelector(fleet="my-fleet", instance=1)], + ), ) assert res == [instance1] + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_fleet_and_instance_number_without_loading_instance_fleet( + self, test_db, session: AsyncSession + ): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=1, + name="worker-b", + ) + session.expire(instance, ["fleet"]) + assert "fleet" in sa_inspect(instance).unloaded + + res = instances_services.filter_instances( + instances=[instance], + profile=Profile( + name="test", + instances=[FleetInstanceSelector(fleet="my-fleet", instance=1)], + ), + fleet=fleet, + ) + assert res == [instance] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ("selector", "expected_instance_name"), + [ + ("same-fleet", "local-worker"), + ("exporter-project/same-fleet", "exported-worker"), + ], + ) + async def test_fleet_selector_respects_project_reference( + self, + test_db, + session: AsyncSession, + selector: str, + expected_instance_name: str, + ): + user = await create_user(session=session) + project = await create_project(session=session, owner=user, name="importer-project") + exporter_project = await create_project( + session=session, owner=user, name="exporter-project" + ) + local_fleet = await create_fleet(session=session, project=project, name="same-fleet") + exported_fleet = await create_fleet( + session=session, project=exporter_project, name="same-fleet" + ) + local_instance = await create_instance( + session=session, + project=project, + fleet=local_fleet, + instance_num=1, + name="local-worker", + ) + exported_instance = await create_instance( + session=session, + project=exporter_project, + fleet=exported_fleet, + instance_num=1, + name="exported-worker", + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[project], + exported_fleets=[exported_fleet], + ) + + res = instances_services.filter_instances( + instances=[local_instance, exported_instance], + profile=Profile( + name="test", + instances=[FleetInstanceSelector(fleet=selector, instance=1)], + ), + project=project, + ) + + assert [i.name for i in res] == [expected_instance_name] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_no_instances_selector_returns_all(self, test_db, session: AsyncSession): @@ -326,7 +471,7 @@ async def test_filters_by_instance_name_for_multinode(self, test_db, session: As ) res = instances_services.filter_instances( instances=[instance0, instance1], - profile=Profile(name="test", instances=["my-fleet-1"]), + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), multinode=True, ) assert res == [instance1] @@ -351,7 +496,7 @@ async def test_filters_by_instance_name_for_shared(self, test_db, session: Async ) res = instances_services.filter_instances( instances=[instance0, instance1], - profile=Profile(name="test", instances=["my-fleet-1"]), + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), shared=True, ) assert res == [instance1] From 068ca7530744ba1559b23e7145cd6d78df594845 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:05:39 -0700 Subject: [PATCH 06/13] Describe current behavior in skip_backend_offers comment --- src/dstack/_internal/server/services/runs/plan.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index ed5780f440..bfdfd10dad 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -437,9 +437,11 @@ async def find_optimal_fleet_with_offers( ) ) - # If `instances` is set, backend offers cannot satisfy the run. Otherwise, - # keep the existing optimization that skips backend requests when pool - # capacity is already enough. + # Backend offers cannot satisfy a run that targets specific `instances`, + # so they are always skipped for such runs. Otherwise, if any candidate + # fleet has pool capacity, the optimal fleet will be one of those, so + # backend offers from any fleet won't affect selection — skip them + # entirely when allowed. skip_backend_offers = run_spec.merged_profile.instances is not None or ( skip_backend_offers_on_pool_capacity and any(candidate.has_pool_capacity for candidate in candidates) From 3f47cfcdb1265d58df8d36f96f4f41668ca350d8 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:06:28 -0700 Subject: [PATCH 07/13] Match instance internal IP in hostname selector --- .../_internal/server/services/instances.py | 7 +++-- .../server/services/test_instances.py | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index cbb9e54ff4..ae79e18b74 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -438,8 +438,11 @@ def instance_matches_hostname_selector( ) -> bool: candidates = set() jpd = get_instance_provisioning_data(instance) - if jpd is not None and jpd.hostname is not None: - candidates.add(jpd.hostname.lower()) + if jpd is not None: + if jpd.hostname is not None: + candidates.add(jpd.hostname.lower()) + if jpd.internal_ip is not None: + candidates.add(jpd.internal_ip.lower()) rci = get_instance_remote_connection_info(instance) if rci is not None: candidates.add(rci.host.lower()) diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index 7192c17d69..1b95c2d607 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -279,6 +279,36 @@ async def test_filters_by_hostname(self, test_db, session: AsyncSession): ) assert res == [instance1] + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_internal_ip(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data( + hostname="203.0.113.7", internal_ip="10.0.0.7" + ), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + job_provisioning_data=get_job_provisioning_data( + hostname="203.0.113.8", internal_ip="10.0.0.8" + ), + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="10.0.0.8")], + ), + ) + assert res == [instance1] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_string_selector_does_not_match_hostname(self, test_db, session: AsyncSession): From 99a62dfce878f95add8e5d158bc582e1e38cb007 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:15:44 -0700 Subject: [PATCH 08/13] Annotate FleetInstanceSelector.fleet as EntityReference --- src/dstack/_internal/core/models/profiles.py | 28 +++++++++++++------ .../_internal/server/services/instances.py | 5 ++-- .../_internal/server/services/runs/plan.py | 4 +-- .../_internal/core/models/test_profiles.py | 26 +++++++++++++++++ 4 files changed, 50 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index befb4a6c14..bc3e5aa86e 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -244,25 +244,37 @@ class InstanceHostnameSelector(CoreModel): ] -def _validate_fleet_instance_selector_fleet(v: str) -> str: - EntityReference.parse(v) +def _parse_fleet_instance_selector_fleet(v: Any) -> Any: + if isinstance(v, str): + return EntityReference.parse(v) return v -class FleetInstanceSelector(CoreModel): +class FleetInstanceSelectorConfig(CoreConfig): + @staticmethod + def schema_extra(schema: Dict[str, Any]): + add_extra_schema_types( + schema["properties"]["fleet"], + extra_types=[{"type": "string", "minLength": 1}], + ) + + +class FleetInstanceSelector(generate_dual_core_model(FleetInstanceSelectorConfig)): fleet: Annotated[ - str, + EntityReference, Field( description=( - "The fleet name. For fleets owned by the current project, specify the fleet name." - " For a fleet from another project, specify `/`" + "The fleet reference. For fleets owned by the current project, specify" + " the fleet name. For a fleet from another project, specify" + " `/` or an object with `project` and `name`" ), - min_length=1, ), ] instance: Annotated[int, Field(description="The fleet instance number", ge=0)] - _validate_fleet = validator("fleet", allow_reuse=True)(_validate_fleet_instance_selector_fleet) + _validate_fleet = validator("fleet", pre=True, allow_reuse=True)( + _parse_fleet_instance_selector_fleet + ) InstanceSelector = Union[InstanceNameSelector, InstanceHostnameSelector, FleetInstanceSelector] diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index ae79e18b74..ec57995b83 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -17,7 +17,6 @@ from dstack._internal.core.backends.features import BACKENDS_WITH_MULTINODE_SUPPORT from dstack._internal.core.errors import ResourceNotExistsError from dstack._internal.core.models.backends.base import BackendType -from dstack._internal.core.models.common import EntityReference from dstack._internal.core.models.envs import Env from dstack._internal.core.models.health import HealthCheck, HealthEvent, HealthStatus from dstack._internal.core.models.instances import ( @@ -386,7 +385,7 @@ def profile_instances_have_qualified_fleet_selector(profile: Profile) -> bool: return False for selector in profile.instances: if isinstance(selector, FleetInstanceSelector): - if EntityReference.parse(selector.fleet).project is not None: + if selector.fleet.project is not None: return True return False @@ -456,7 +455,7 @@ def instance_matches_fleet_instance_selector( project: Optional[ProjectModel] = None, fleet: Optional[FleetModel] = None, ) -> bool: - fleet_ref = EntityReference.parse(selector.fleet) + fleet_ref = selector.fleet if fleet is None: # Avoid triggering a lazy load in async code. diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index bfdfd10dad..3f851b4aa3 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -229,7 +229,7 @@ async def get_run_candidate_fleet_models_filters( *[ _get_fleet_reference_condition( project=project, - ref=EntityReference.parse(selector.fleet), + ref=selector.fleet, ) for selector in fleet_instance_selectors ] @@ -241,7 +241,7 @@ async def get_run_candidate_fleet_models_filters( and_( _get_fleet_reference_condition( project=project, - ref=EntityReference.parse(selector.fleet), + ref=selector.fleet, ), InstanceModel.instance_num == selector.instance, ) diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py index d7aaa7fc74..b9f600c279 100644 --- a/src/tests/_internal/core/models/test_profiles.py +++ b/src/tests/_internal/core/models/test_profiles.py @@ -2,6 +2,7 @@ from pydantic import ValidationError from dstack._internal.core.backends.vastai.profile_options import VastAIProfileOptions +from dstack._internal.core.models.common import EntityReference from dstack._internal.core.models.profiles import ( FleetInstanceSelector, InstanceHostnameSelector, @@ -81,3 +82,28 @@ def test_invalid_object_selector_is_rejected(self, value): def test_empty_instances_list_is_rejected(self): with pytest.raises(ValidationError): Profile.parse_obj({"instances": []}) + + def test_parses_fleet_selector_string_to_entity_reference(self): + profile = Profile.parse_obj( + {"name": "test", "instances": [{"fleet": "main/my-fleet", "instance": 0}]} + ) + assert profile.instances == [ + FleetInstanceSelector( + fleet=EntityReference(project="main", name="my-fleet"), instance=0 + ) + ] + + def test_parses_fleet_selector_object_notation(self): + profile = Profile.parse_obj( + { + "name": "test", + "instances": [ + {"fleet": {"project": "main", "name": "my-fleet"}, "instance": 0} + ], + } + ) + assert profile.instances == [ + FleetInstanceSelector( + fleet=EntityReference(project="main", name="my-fleet"), instance=0 + ) + ] From f6a7c17a84fec2a25ad31a7a69665783478ba5ab Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:22:29 -0700 Subject: [PATCH 09/13] Test invalid fleet selector references are rejected --- src/tests/_internal/core/models/test_profiles.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py index b9f600c279..d1f190f085 100644 --- a/src/tests/_internal/core/models/test_profiles.py +++ b/src/tests/_internal/core/models/test_profiles.py @@ -107,3 +107,10 @@ def test_parses_fleet_selector_object_notation(self): fleet=EntityReference(project="main", name="my-fleet"), instance=0 ) ] + + @pytest.mark.parametrize("fleet", ["", "a/b/c", "/my-fleet", "my-project/"]) + def test_rejects_invalid_fleet_selector_reference(self, fleet: str): + with pytest.raises(ValidationError): + Profile.parse_obj( + {"name": "test", "instances": [{"fleet": fleet, "instance": 0}]} + ) From 257cef6cce0f1078f64ba61d2fbece7a2d88782b Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:30:46 -0700 Subject: [PATCH 10/13] Require loaded relationships for instance selector matching --- .../pipeline_tasks/jobs_submitted.py | 42 +++------ .../_internal/server/services/instances.py | 89 +++++++------------ .../_internal/server/services/runs/plan.py | 31 +++---- .../server/services/runs/test_plan.py | 1 - .../server/services/test_instances.py | 42 ++++----- 5 files changed, 77 insertions(+), 128 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 7decdd5900..1b24dabba4 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -7,7 +7,6 @@ from typing import Optional, Sequence, Union from sqlalchemy import and_, func, or_, select, update -from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only, selectinload @@ -98,7 +97,7 @@ get_instance_offer, get_instance_provisioning_data, is_placeholder_instance, - profile_instances_have_qualified_fleet_selector, + populate_instances_fleet, switch_instance_status, ) from dstack._internal.server.services.jobs import ( @@ -703,13 +702,18 @@ async def _load_submitted_job_context( FleetModel.instances.and_(InstanceModel.deleted == False) ) ) + .options( + joinedload(JobModel.fleet) + .joinedload(FleetModel.project) + .load_only(ProjectModel.name) + ) .execution_options(populate_existing=True) ) job_model = res.unique().scalar_one() run = run_model_to_run(run_model) - if profile_instances_have_qualified_fleet_selector(run.run_spec.merged_profile): - await _load_fleet_project_if_needed(session=session, fleet_model=run_model.fleet) - await _load_fleet_project_if_needed(session=session, fleet_model=job_model.fleet) + for fleet_model in (run_model.fleet, job_model.fleet): + if fleet_model is not None: + populate_instances_fleet(fleet_model) job = find_job(run.jobs, job_model.replica_num, job_model.job_num) replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) return _SubmittedJobContext( @@ -789,26 +793,17 @@ async def _fetch_run_model_for_submitted_job( FleetModel.instances.and_(InstanceModel.deleted == False) ) ) + .options( + joinedload(RunModel.fleet) + .joinedload(FleetModel.project) + .load_only(ProjectModel.name) + ) .options(contains_eager(RunModel.jobs, alias=job_alias)) .execution_options(populate_existing=True) ) return res.unique().scalar_one() -async def _load_fleet_project_if_needed( - session: AsyncSession, - fleet_model: Optional[FleetModel], -) -> None: - if fleet_model is None or "project" not in sa_inspect(fleet_model).unloaded: - return - await session.execute( - select(FleetModel) - .where(FleetModel.id == fleet_model.id) - .options(joinedload(FleetModel.project)) - .execution_options(populate_existing=True) - ) - - def _get_job_models_for_jobs( job_models: list[JobModel], jobs: list[Job], @@ -927,9 +922,6 @@ async def _load_assignment_candidate_fleets( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=False, - load_fleet_project=profile_instances_have_qualified_fleet_selector( - context.run.run_spec.merged_profile - ), ) return fleets_with_instances + fleets_without_instances @@ -993,9 +985,6 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=True, - load_fleet_project=profile_instances_have_qualified_fleet_selector( - context.run.run_spec.merged_profile - ), ) if not fleets_with_instances: return None @@ -1019,9 +1008,6 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( fleet_filters=fleet_filters, instance_filters=[*instance_filters, InstanceModel.id.in_(instance_ids)], lock_instances=True, - load_fleet_project=profile_instances_have_qualified_fleet_selector( - context.run.run_spec.merged_profile - ), ) if not fleets_with_locked_instances: return None diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index ec57995b83..f084372648 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -6,9 +6,9 @@ import gpuhunt from sqlalchemy import and_, exists, false, or_, select -from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, load_only +from sqlalchemy.orm.attributes import set_committed_value from dstack._internal.core.backends.base.offers import ( offer_to_catalog_item, @@ -380,33 +380,21 @@ def get_instance_ssh_private_keys(instance_model: InstanceModel) -> tuple[str, O return host_private_key, proxy_private_keys[0] -def profile_instances_have_qualified_fleet_selector(profile: Profile) -> bool: - if profile.instances is None: - return False - for selector in profile.instances: - if isinstance(selector, FleetInstanceSelector): - if selector.fleet.project is not None: - return True - return False - - def instance_matches_selectors( instance: InstanceModel, selectors: List[InstanceSelector], *, - project: Optional[ProjectModel] = None, - fleet: Optional[FleetModel] = None, + project: ProjectModel, ) -> bool: """ Check if an instance matches any of the given instance selectors. + + Unqualified fleet references are interpreted as fleets of `project`. + `instance.fleet` (and `fleet.project` for project-qualified references) + must be loaded. """ return any( - instance_matches_selector( - instance, - selector, - project=project, - fleet=fleet, - ) + instance_matches_selector(instance, selector, project=project) for selector in selectors ) @@ -415,20 +403,14 @@ def instance_matches_selector( instance: InstanceModel, selector: InstanceSelector, *, - project: Optional[ProjectModel] = None, - fleet: Optional[FleetModel] = None, + project: ProjectModel, ) -> bool: if isinstance(selector, InstanceNameSelector): return instance.name.lower() == selector.name.lower() if isinstance(selector, InstanceHostnameSelector): return instance_matches_hostname_selector(instance, selector) if isinstance(selector, FleetInstanceSelector): - return instance_matches_fleet_instance_selector( - instance, - selector, - project=project, - fleet=fleet, - ) + return _instance_matches_fleet_instance_selector(instance, selector, project=project) return False @@ -448,34 +430,32 @@ def instance_matches_hostname_selector( return selector.hostname.lower() in candidates -def instance_matches_fleet_instance_selector( +def _instance_matches_fleet_instance_selector( instance: InstanceModel, selector: FleetInstanceSelector, *, - project: Optional[ProjectModel] = None, - fleet: Optional[FleetModel] = None, + project: ProjectModel, ) -> bool: - fleet_ref = selector.fleet - + fleet = instance.fleet if fleet is None: - # Avoid triggering a lazy load in async code. - if "fleet" in sa_inspect(instance).unloaded or instance.fleet is None: - return False - fleet = instance.fleet - - if fleet.name.lower() != fleet_ref.name.lower(): + return False + if fleet.name.lower() != selector.fleet.name.lower(): return False if instance.instance_num != selector.instance: return False + if selector.fleet.project is None: + return fleet.project_id == project.id + return fleet.project.name.lower() == selector.fleet.project.lower() - if fleet_ref.project is None: - if project is not None and fleet.project_id != project.id: - return False - return True - if "project" in sa_inspect(fleet).unloaded or fleet.project is None: - return False - return fleet.project.name.lower() == fleet_ref.project.lower() +def populate_instances_fleet(fleet_model: FleetModel) -> None: + """ + Set `instance.fleet` for instances fetched through `FleetModel.instances`. + SQLAlchemy does not populate the reverse many-to-one on load, and instance + selector matching requires it to be loaded. + """ + for instance in fleet_model.instances: + set_committed_value(instance, "fleet", fleet_model) def instance_matches_constraints( @@ -527,7 +507,6 @@ def filter_instances( volumes: Optional[List[List[Volume]]] = None, shared: bool = False, project: Optional[ProjectModel] = None, - fleet: Optional[FleetModel] = None, ) -> List[InstanceModel]: backend_types: Optional[list[BackendType]] = profile.backends regions: Optional[list[str]] = profile.regions @@ -568,6 +547,8 @@ def filter_instances( instance_types = profile.instance_types instance_selectors = profile.instances + if instance_selectors is not None and project is None: + raise ValueError("project must be provided when profile.instances is set") filtered_instances: List[InstanceModel] = [] for instance in instances: @@ -576,8 +557,7 @@ def filter_instances( if instance_selectors is not None and not instance_matches_selectors( instance, instance_selectors, - project=project, - fleet=fleet, + project=common_utils.get_or_error(project), ): continue if instance.health.is_failure(): @@ -622,7 +602,6 @@ def get_shared_instances_with_offers( multinode: bool = False, volumes: Optional[List[List[Volume]]] = None, project: Optional[ProjectModel] = None, - fleet: Optional[FleetModel] = None, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] query_filter = requirements_to_query_filter(requirements) @@ -633,7 +612,6 @@ def get_shared_instances_with_offers( volumes=volumes, shared=True, project=project, - fleet=fleet, ) for instance in filtered_instances: if idle_only and instance.status not in [InstanceStatus.IDLE, InstanceStatus.BUSY]: @@ -663,12 +641,7 @@ def get_shared_instances_with_offers( async def get_pool_instances( session: AsyncSession, project: ProjectModel, - *, - load_fleet_project: bool = False, ) -> List[InstanceModel]: - fleet_load = joinedload(InstanceModel.fleet) - if load_fleet_project: - fleet_load = fleet_load.joinedload(FleetModel.project) res = await session.execute( select(InstanceModel) .where( @@ -682,7 +655,11 @@ async def get_pool_instances( ), InstanceModel.deleted == False, ) - .options(fleet_load) + .options( + joinedload(InstanceModel.fleet) + .joinedload(FleetModel.project) + .load_only(ProjectModel.name) + ) ) instance_models = list(res.unique().scalars().all()) return instance_models diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 3f851b4aa3..ffbfcbf7d8 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -49,7 +49,7 @@ get_pool_instances, get_shared_instances_with_offers, is_placeholder_instance, - profile_instances_have_qualified_fleet_selector, + populate_instances_fleet, ) from dstack._internal.server.services.jobs import ( get_instances_ids_with_detaching_volumes, @@ -284,16 +284,18 @@ async def select_run_candidate_fleet_models_with_filters( fleet_filters: list, instance_filters: list, lock_instances: bool, - load_fleet_project: bool = False, ) -> tuple[list[FleetModel], list[FleetModel]]: # Selecting fleets in two queries since Postgres does not allow # locking nullable side of an outer join. So, first lock instances with inner join. # Then select left out fleets without instances. - with_instances_options = [contains_eager(FleetModel.instances)] - without_instances_options = [noload(FleetModel.instances)] - if load_fleet_project: - with_instances_options.append(contains_eager(FleetModel.project)) - without_instances_options.append(contains_eager(FleetModel.project)) + with_instances_options = [ + contains_eager(FleetModel.instances), + contains_eager(FleetModel.project).load_only(ProjectModel.name), + ] + without_instances_options = [ + noload(FleetModel.instances), + contains_eager(FleetModel.project).load_only(ProjectModel.name), + ] stmt = ( select(FleetModel) .join(FleetModel.project) # can be referenced by fleet_filters @@ -331,6 +333,8 @@ async def select_run_candidate_fleet_models_with_filters( .execution_options(populate_existing=True) ) fleet_models_without_instances = list(res.unique().scalars().all()) + for fleet_model in fleet_models_with_instances: + populate_instances_fleet(fleet_model) return fleet_models_with_instances, fleet_models_without_instances @@ -537,9 +541,6 @@ async def _select_candidate_fleet_models( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=False, - load_fleet_project=profile_instances_have_qualified_fleet_selector( - run_spec.merged_profile - ), ) return fleet_models_with_instances + fleet_models_without_instances @@ -565,7 +566,6 @@ def get_instance_offers_in_fleet( volumes=volumes, shared=False, project=project, - fleet=fleet_model, ) instances_with_offers = _get_offers_from_instances(nonshared_instances) shared_instances_with_offers = get_shared_instances_with_offers( @@ -575,7 +575,6 @@ def get_instance_offers_in_fleet( multinode=multinode, volumes=volumes, project=project, - fleet=fleet_model, ) instances_with_offers.extend(shared_instances_with_offers) instances_with_offers.sort(key=lambda o: o[0].price or 0) @@ -678,13 +677,7 @@ async def _get_pool_offers( ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: pool_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) - pool_instances = await get_pool_instances( - session, - project, - load_fleet_project=profile_instances_have_qualified_fleet_selector( - run_spec.merged_profile - ), - ) + pool_instances = await get_pool_instances(session, project) pool_instances = [i for i in pool_instances if i.id not in detaching_instances_ids] multinode = is_multinode_job(job) shared_instances_with_offers = get_shared_instances_with_offers( diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 7c3eb628d1..e1be5410bc 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -247,7 +247,6 @@ async def test_fleet_instance_selector_narrows_candidate_fleets( fleet_filters=fleet_filters, instance_filters=instance_filters, lock_instances=False, - load_fleet_project=True, ) assert [fleet.project.name for fleet in fleets_with_instances] == [ diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index 1b95c2d607..b35005ed00 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -2,7 +2,6 @@ from unittest.mock import Mock, call import pytest -from sqlalchemy import inspect as sa_inspect from sqlalchemy.ext.asyncio import AsyncSession import dstack._internal.server.services.instances as instances_services @@ -233,6 +232,7 @@ async def test_filters_by_instance_name(self, test_db, session: AsyncSession): res = instances_services.filter_instances( instances=instances, profile=Profile(name="test", instances=[InstanceNameSelector(name="my-cluster-1")]), + project=project, ) assert res == [instance1] @@ -249,6 +249,7 @@ async def test_filters_by_instance_name_case_insensitive(self, test_db, session: res = instances_services.filter_instances( instances=[instance0], profile=Profile(name="test", instances=[InstanceNameSelector(name="MY-CLUSTER-0")]), + project=project, ) assert res == [instance0] @@ -276,6 +277,7 @@ async def test_filters_by_hostname(self, test_db, session: AsyncSession): name="test", instances=[InstanceHostnameSelector(hostname="10.0.0.8")], ), + project=project, ) assert res == [instance1] @@ -306,6 +308,7 @@ async def test_filters_by_internal_ip(self, test_db, session: AsyncSession): name="test", instances=[InstanceHostnameSelector(hostname="10.0.0.8")], ), + project=project, ) assert res == [instance1] @@ -323,6 +326,7 @@ async def test_string_selector_does_not_match_hostname(self, test_db, session: A res = instances_services.filter_instances( instances=[instance], profile=Profile.parse_obj({"name": "test", "instances": ["10.0.0.8"]}), + project=project, ) assert res == [] @@ -350,6 +354,7 @@ async def test_filters_by_ssh_host(self, test_db, session: AsyncSession): name="test", instances=[InstanceHostnameSelector(hostname="192.168.1.11")], ), + project=project, ) assert res == [instance1] @@ -379,36 +384,23 @@ async def test_filters_by_fleet_and_instance_number(self, test_db, session: Asyn name="test", instances=[FleetInstanceSelector(fleet="my-fleet", instance=1)], ), + project=project, ) assert res == [instance1] @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_filters_by_fleet_and_instance_number_without_loading_instance_fleet( - self, test_db, session: AsyncSession - ): + async def test_selectors_require_project(self, test_db, session: AsyncSession): user = await create_user(session=session) project = await create_project(session=session, owner=user) - fleet = await create_fleet(session=session, project=project, name="my-fleet") - instance = await create_instance( - session=session, - project=project, - fleet=fleet, - instance_num=1, - name="worker-b", - ) - session.expire(instance, ["fleet"]) - assert "fleet" in sa_inspect(instance).unloaded - - res = instances_services.filter_instances( - instances=[instance], - profile=Profile( - name="test", - instances=[FleetInstanceSelector(fleet="my-fleet", instance=1)], - ), - fleet=fleet, - ) - assert res == [instance] + instance = await create_instance(session=session, project=project) + with pytest.raises(ValueError, match="project"): + instances_services.filter_instances( + instances=[instance], + profile=Profile( + name="test", instances=[InstanceNameSelector(name="instance")] + ), + ) @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @@ -503,6 +495,7 @@ async def test_filters_by_instance_name_for_multinode(self, test_db, session: As instances=[instance0, instance1], profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), multinode=True, + project=project, ) assert res == [instance1] @@ -528,6 +521,7 @@ async def test_filters_by_instance_name_for_shared(self, test_db, session: Async instances=[instance0, instance1], profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), shared=True, + project=project, ) assert res == [instance1] From 1d84eb1db7d25bbfe32a7a6da42e4810fe1de734 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:41:54 -0700 Subject: [PATCH 11/13] Apply instances node-count validation only to multinode tasks --- src/dstack/_internal/server/services/runs/spec.py | 5 ++++- .../_internal/server/services/runs/test_spec.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/runs/spec.py b/src/dstack/_internal/server/services/runs/spec.py index 2a22e7162f..66e91053b8 100644 --- a/src/dstack/_internal/server/services/runs/spec.py +++ b/src/dstack/_internal/server/services/runs/spec.py @@ -116,7 +116,10 @@ def validate_run_spec_and_set_defaults( f"Probe timeout cannot be longer than {settings.MAX_PROBE_TIMEOUT}s" ) instances = run_spec.merged_profile.instances - if instances is not None: + # Only multinode tasks require a dedicated instance per node. Service + # replicas may share one instance with enough idle blocks, so they are + # not validated against the instance count. + if instances is not None and run_spec.configuration.type == "task": nodes_required_num = get_nodes_required_num(run_spec) if len(instances) < nodes_required_num: raise ServerClientError( diff --git a/src/tests/_internal/server/services/runs/test_spec.py b/src/tests/_internal/server/services/runs/test_spec.py index 08a662e78e..d9901121c9 100644 --- a/src/tests/_internal/server/services/runs/test_spec.py +++ b/src/tests/_internal/server/services/runs/test_spec.py @@ -146,6 +146,20 @@ def test_allows_single_node_with_instances(self): validate_run_spec_and_set_defaults(self._user(), run_spec) + def test_allows_fewer_instances_than_service_replicas(self): + # Service replicas can share a multi-block instance, so the + # instance-count check applies only to multinode tasks. + run_spec = get_run_spec( + repo_id="test-repo", + configuration=ServiceConfiguration(commands=["echo"], port=8000, replicas=2), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + class TestCheckCanUpdateConfigurationRouterType: def test_sglang_to_dynamo_router_type_change_is_rejected(self): From 2aa4324276153c190a7213af69154881259764f5 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 12:46:28 -0700 Subject: [PATCH 12/13] Document instances in .dstack.yml references --- .../reference/dstack.yml/dev-environment.md | 29 +++++++++++++++++++ mkdocs/docs/reference/dstack.yml/service.md | 29 +++++++++++++++++++ mkdocs/docs/reference/dstack.yml/task.md | 29 +++++++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/mkdocs/docs/reference/dstack.yml/dev-environment.md b/mkdocs/docs/reference/dstack.yml/dev-environment.md index 594b6e65d7..ca1ec03c00 100644 --- a/mkdocs/docs/reference/dstack.yml/dev-environment.md +++ b/mkdocs/docs/reference/dstack.yml/dev-environment.md @@ -34,6 +34,35 @@ The `dev-environment` configuration type allows running [dev environments](../.. type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec diff --git a/mkdocs/docs/reference/dstack.yml/service.md b/mkdocs/docs/reference/dstack.yml/service.md index 5ddfe46dd1..c93e5b27a3 100644 --- a/mkdocs/docs/reference/dstack.yml/service.md +++ b/mkdocs/docs/reference/dstack.yml/service.md @@ -114,6 +114,35 @@ The `service` configuration type allows running [services](../../concepts/servic type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec diff --git a/mkdocs/docs/reference/dstack.yml/task.md b/mkdocs/docs/reference/dstack.yml/task.md index 96d05c325d..8091e985d1 100644 --- a/mkdocs/docs/reference/dstack.yml/task.md +++ b/mkdocs/docs/reference/dstack.yml/task.md @@ -34,6 +34,35 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec From 90d8af4df713054a80e5f25b0d1aeda29ed889c5 Mon Sep 17 00:00:00 2001 From: fededagos Date: Wed, 10 Jun 2026 15:48:42 -0700 Subject: [PATCH 13/13] Format code with ruff format --- .../server/background/pipeline_tasks/jobs_submitted.py | 8 ++------ src/dstack/_internal/server/services/instances.py | 3 +-- src/tests/_internal/core/models/test_profiles.py | 8 ++------ src/tests/_internal/server/services/test_instances.py | 4 +--- 4 files changed, 6 insertions(+), 17 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 1b24dabba4..4afa4601e8 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -703,9 +703,7 @@ async def _load_submitted_job_context( ) ) .options( - joinedload(JobModel.fleet) - .joinedload(FleetModel.project) - .load_only(ProjectModel.name) + joinedload(JobModel.fleet).joinedload(FleetModel.project).load_only(ProjectModel.name) ) .execution_options(populate_existing=True) ) @@ -794,9 +792,7 @@ async def _fetch_run_model_for_submitted_job( ) ) .options( - joinedload(RunModel.fleet) - .joinedload(FleetModel.project) - .load_only(ProjectModel.name) + joinedload(RunModel.fleet).joinedload(FleetModel.project).load_only(ProjectModel.name) ) .options(contains_eager(RunModel.jobs, alias=job_alias)) .execution_options(populate_existing=True) diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index f084372648..bf15c8c903 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -394,8 +394,7 @@ def instance_matches_selectors( must be loaded. """ return any( - instance_matches_selector(instance, selector, project=project) - for selector in selectors + instance_matches_selector(instance, selector, project=project) for selector in selectors ) diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py index d1f190f085..326f3eb31f 100644 --- a/src/tests/_internal/core/models/test_profiles.py +++ b/src/tests/_internal/core/models/test_profiles.py @@ -97,9 +97,7 @@ def test_parses_fleet_selector_object_notation(self): profile = Profile.parse_obj( { "name": "test", - "instances": [ - {"fleet": {"project": "main", "name": "my-fleet"}, "instance": 0} - ], + "instances": [{"fleet": {"project": "main", "name": "my-fleet"}, "instance": 0}], } ) assert profile.instances == [ @@ -111,6 +109,4 @@ def test_parses_fleet_selector_object_notation(self): @pytest.mark.parametrize("fleet", ["", "a/b/c", "/my-fleet", "my-project/"]) def test_rejects_invalid_fleet_selector_reference(self, fleet: str): with pytest.raises(ValidationError): - Profile.parse_obj( - {"name": "test", "instances": [{"fleet": fleet, "instance": 0}]} - ) + Profile.parse_obj({"name": "test", "instances": [{"fleet": fleet, "instance": 0}]}) diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index b35005ed00..f5b8a69290 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -397,9 +397,7 @@ async def test_selectors_require_project(self, test_db, session: AsyncSession): with pytest.raises(ValueError, match="project"): instances_services.filter_instances( instances=[instance], - profile=Profile( - name="test", instances=[InstanceNameSelector(name="instance")] - ), + profile=Profile(name="test", instances=[InstanceNameSelector(name="instance")]), ) @pytest.mark.asyncio