Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions mkdocs/docs/reference/dstack.yml/dev-environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,40 @@ The `dev-environment` configuration type allows running [dev environments](../..
type:
required: true

### `instances[n]` { #_instances data-toc-label="instances" }

When `instances` is set, the run is placed only on matching existing fleet instances.

=== "By name"

#SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By hostname"

#SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By fleet and instance number"

#SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for instances is an instance name string.

* `my-fleet-1`, same as `{name: my-fleet-1}`

### `resources`

#SCHEMA# dstack._internal.core.models.resources.ResourcesSpec
Expand Down
34 changes: 34 additions & 0 deletions mkdocs/docs/reference/dstack.yml/service.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,40 @@ The `service` configuration type allows running [services](../../concepts/servic
type:
required: true

### `instances[n]` { #_instances data-toc-label="instances" }

When `instances` is set, the run is placed only on matching existing fleet instances.

=== "By name"

#SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By hostname"

#SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By fleet and instance number"

#SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for instances is an instance name string.

* `my-fleet-1`, same as `{name: my-fleet-1}`

### `resources`

#SCHEMA# dstack._internal.core.models.resources.ResourcesSpec
Expand Down
34 changes: 34 additions & 0 deletions mkdocs/docs/reference/dstack.yml/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,40 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md).
type:
required: true

### `instances[n]` { #_instances data-toc-label="instances" }

When `instances` is set, the run is placed only on matching existing fleet instances.

=== "By name"

#SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By hostname"

#SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector
overrides:
show_root_heading: false
type:
required: true

=== "By fleet and instance number"

#SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for instances is an instance name string.

* `my-fleet-1`, same as `{name: my-fleet-1}`

### `resources`

#SCHEMA# dstack._internal.core.models.resources.ResourcesSpec
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/compatibility/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ def get_profile_excludes(profile: Optional[ProfileParams]) -> IncludeExcludeSetT
return excludes
if profile.backend_options is None:
excludes.add("backend_options")
if profile.instances is None:
excludes.add("instances")
return excludes


Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/compatibility/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType:
spec_excludes: IncludeExcludeDictType = {}
configuration_excludes: IncludeExcludeDictType = {}
profile_excludes = get_profile_excludes(run_spec.profile)
for field in get_profile_excludes(run_spec.configuration):
configuration_excludes[field] = True

if run_spec.configuration.backend_options is None:
configuration_excludes["backend_options"] = True
Expand Down
71 changes: 71 additions & 0 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,58 @@ def crons(self) -> List[str]:
return self.cron


class InstanceNameSelector(CoreModel):
name: Annotated[str, Field(description="The fleet instance name", min_length=1)]


class InstanceHostnameSelector(CoreModel):
hostname: Annotated[
str, Field(description="The fleet instance hostname or IP address", min_length=1)
]


def _parse_fleet_instance_selector_fleet(v: Any) -> Any:
if isinstance(v, str):
return EntityReference.parse(v)
return v
Comment thread
jvstme marked this conversation as resolved.


class FleetInstanceSelectorConfig(CoreConfig):
@staticmethod
def schema_extra(schema: Dict[str, Any]):
add_extra_schema_types(
schema["properties"]["fleet"],
extra_types=[{"type": "string", "minLength": 1}],
)


class FleetInstanceSelector(generate_dual_core_model(FleetInstanceSelectorConfig)):
fleet: Annotated[
EntityReference,
Field(
description=(
"The fleet reference. For fleets owned by the current project, specify"
" the fleet name. For a fleet from another project, specify"
" `<project name>/<fleet name>` or an object with `project` and `name`."
),
),
]
instance: Annotated[int, Field(description="The fleet instance number", ge=0)]

_validate_fleet = validator("fleet", pre=True, allow_reuse=True)(
_parse_fleet_instance_selector_fleet
)


InstanceSelector = Union[InstanceNameSelector, InstanceHostnameSelector, FleetInstanceSelector]


def parse_instance_selector(v: Union[InstanceSelector, str]) -> InstanceSelector:
if isinstance(v, str):
return InstanceNameSelector(name=v)
return v


class ProfileParamsConfig(CoreConfig):
@staticmethod
def schema_extra(schema: Dict[str, Any]):
Expand All @@ -249,6 +301,10 @@ def schema_extra(schema: Dict[str, Any]):
schema["properties"]["idle_duration"],
extra_types=[{"type": "string"}],
)
add_extra_schema_types(
schema["properties"]["instances"]["items"],
extra_types=[{"type": "string", "minLength": 1}],
)


class ProfileParams(CoreModel):
Expand Down Expand Up @@ -391,6 +447,18 @@ class ProfileParams(CoreModel):
),
),
] = None
instances: Annotated[
Optional[List[InstanceSelector]],
Field(
description=(
"The specific fleet instances to consider for reuse."
" Each value can be an instance name string, or an object with"
" `name`, `hostname`, or `fleet` and `instance`."
" When set, the run is only placed on matching existing instances."
),
min_items=1,
),
] = None
tags: Annotated[
Optional[Dict[str, str]],
Field(
Expand All @@ -416,6 +484,9 @@ class ProfileParams(CoreModel):
parse_idle_duration
)
_validate_fleets = validator("fleets", allow_reuse=True, each_item=True)(EntityReference.parse)
_validate_instances = validator("instances", pre=True, allow_reuse=True, each_item=True)(
parse_instance_selector
)
_validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator)
_validate_backend_options = validator("backend_options", allow_reuse=True)(
validate_backend_options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
get_instance_offers_in_fleet,
get_run_candidate_fleet_models_filters,
get_run_profile_and_requirements_in_fleet,
get_targeted_instance_offers,
select_run_candidate_fleet_models_with_filters,
)
from dstack._internal.server.services.runs.spec import (
Expand Down Expand Up @@ -494,6 +495,12 @@ async def _process_assignment(context: _SubmittedJobContext) -> _AssignmentResul
if not isinstance(preconditions, _ProcessedPreconditions):
return preconditions

if context.run.run_spec.merged_profile.instances is not None:
return await _select_targeted_instance_assignment(
context=context,
preconditions=preconditions,
)

candidate_fleet_models = await _load_assignment_candidate_fleets(context=context)
return await _select_assignment(
context=context,
Expand Down Expand Up @@ -533,6 +540,30 @@ async def _select_assignment(
return _NewCapacityAssignment(fleet_id=fleet_model.id)


async def _select_targeted_instance_assignment(
context: _SubmittedJobContext,
preconditions: _ProcessedPreconditions,
) -> _AssignmentResult:
async with get_session_ctx() as session:
instance_offers = await get_targeted_instance_offers(
session=session,
project=context.project,
run_spec=context.run.run_spec,
job=context.job,
master_job_provisioning_data=preconditions.master_job_provisioning_data,
volumes=preconditions.prepared_job_volumes.volumes,
exclude_not_available=True,
fleet_id=context.run_model.fleet_id,
)
if len(instance_offers) < _get_required_targeted_instance_offers(context):
return _NoFleetAssignment()
return _ExistingInstanceAssignment(
fleet_id=get_or_error(instance_offers[0][0].fleet_id),
master_job_provisioning_data=preconditions.master_job_provisioning_data,
volumes=preconditions.prepared_job_volumes.volumes,
)


async def _apply_assignment_result(
item: JobSubmittedPipelineItem,
context: _SubmittedJobContext,
Expand Down Expand Up @@ -621,6 +652,28 @@ async def _apply_assignment_result(
return

async with AsyncExitStack() as exit_stack:
if context.run.run_spec.merged_profile.instances is not None:
current_instance_offers = await _lock_targeted_instance_offers_for_assignment(
exit_stack=exit_stack,
session=session,
context=context,
assignment=assignment,
)
if len(current_instance_offers) < _get_required_targeted_instance_offers(context):
await _reset_job_lock_for_retry(session=session, item=item)
return

instance_model, current_offer = current_instance_offers[0]
_assign_instance_to_job(
session=session,
job_model=job_model,
instance_model=instance_model,
offer=current_offer,
multinode=context.multinode,
)
await _mark_job_processed(session=session, job_model=job_model)
return

fleet_model = await _lock_assignment_fleet_for_existing_instance_assignment(
exit_stack=exit_stack,
session=session,
Expand Down Expand Up @@ -905,6 +958,16 @@ async def _apply_no_fleet_selection(
job_model: JobModel,
run: Run,
) -> None:
if run.run_spec.merged_profile.instances is not None:
logger.debug("%s: failed to use specified instances", fmt(job_model))
await _terminate_submitted_job(
session=session,
job_model=job_model,
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
message="Failed to use specified instances",
)
return

if run.run_spec.merged_profile.fleets is not None:
logger.debug("%s: failed to use specified fleets", fmt(job_model))
await _terminate_submitted_job(
Expand All @@ -927,6 +990,45 @@ async def _apply_no_fleet_selection(
)


async def _lock_targeted_instance_offers_for_assignment(
exit_stack: AsyncExitStack,
session: AsyncSession,
context: _SubmittedJobContext,
assignment: _ExistingInstanceAssignment,
) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]:
instance_offers = await get_targeted_instance_offers(
session=session,
project=context.project,
run_spec=context.run.run_spec,
job=context.job,
master_job_provisioning_data=assignment.master_job_provisioning_data,
volumes=assignment.volumes,
exclude_not_available=True,
fleet_id=assignment.fleet_id,
lock_instances=True,
)
instance_ids = sorted(instance.id for instance, _ in instance_offers)
if not instance_ids or not is_db_sqlite():
return instance_offers

await sqlite_commit(session)
await exit_stack.enter_async_context(
get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instance_ids)
)
return await get_targeted_instance_offers(
session=session,
project=context.project,
run_spec=context.run.run_spec,
job=context.job,
master_job_provisioning_data=assignment.master_job_provisioning_data,
volumes=assignment.volumes,
exclude_not_available=True,
fleet_id=assignment.fleet_id,
instance_ids=instance_ids,
lock_instances=True,
)


async def _lock_assignment_fleet_for_existing_instance_assignment(
exit_stack: AsyncExitStack,
session: AsyncSession,
Expand Down Expand Up @@ -2046,6 +2148,12 @@ def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobM
return jobs_to_provision


def _get_required_targeted_instance_offers(context: _SubmittedJobContext) -> int:
if is_multinode_job(context.job) and is_master_job(context.job):
return len(context.jobs_to_provision)
return 1


def _release_replica_jobs_from_master_wait(
job_model: JobModel,
replica_job_models: list[JobModel],
Expand Down
Loading
Loading