From 97a3a2247b15b8f045f8d065318597c11abd2d9a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 24 Apr 2026 18:26:03 +0500 Subject: [PATCH 1/9] Add skip_min_processing_interval for job provisioning --- .../background/pipeline_tasks/jobs_running.py | 5 +++ .../pipeline_tasks/jobs_submitted.py | 8 ++++ ...f0c41_add_jobmodel_skip_min_processing_.py | 37 +++++++++++++++++++ src/dstack/_internal/server/models.py | 3 ++ 4 files changed, 53 insertions(+) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_24_0715_d9f6d27f0c41_add_jobmodel_skip_min_processing_.py diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 7c7483a34..84b46d3cf 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -225,6 +225,7 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: JobModel.last_processed_at <= now - self._min_processing_interval * 2, ), + JobModel.skip_min_processing_interval == True, ), or_( and_( @@ -252,6 +253,7 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: JobModel.lock_expires_at, JobModel.status, JobModel.replica_num, + JobModel.skip_min_processing_interval, ) ) ) @@ -264,6 +266,7 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: job_model.lock_expires_at = lock_expires_at job_model.lock_token = lock_token job_model.lock_owner = JobRunningPipeline.__name__ + job_model.skip_min_processing_interval = False items.append( JobRunningPipelineItem( __tablename__=JobModel.__tablename__, @@ -339,6 +342,7 @@ class _JobUpdateMap(ItemUpdateMap, total=False): exit_status: Optional[int] registered: bool image_pull_progress: Optional[str] + skip_min_processing_interval: bool @dataclass @@ -643,6 +647,7 @@ async def _process_provisioning_status( ) if success: _set_job_status(context.job_model, result, JobStatus.PULLING) + result.job_update_map["skip_min_processing_interval"] = True return else: logger.debug( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 549855391..f150af645 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -246,6 +246,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]: RunModel.fleet_id.is_not(None), ), or_( + JobModel.skip_min_processing_interval == True, JobModel.last_processed_at <= now - self._min_processing_interval, JobModel.last_processed_at == JobModel.submitted_at, ), @@ -268,6 +269,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]: JobModel.id, JobModel.lock_token, JobModel.lock_expires_at, + JobModel.skip_min_processing_interval, ) ) ) @@ -280,6 +282,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]: job_model.lock_expires_at = lock_expires_at job_model.lock_token = lock_token job_model.lock_owner = JobSubmittedPipeline.__name__ + job_model.skip_min_processing_interval = False items.append( JobSubmittedPipelineItem( __tablename__=JobModel.__tablename__, @@ -607,6 +610,7 @@ async def _apply_assignment_result( ) job_model.fleet_id = assignment.fleet_id job_model.instance_assigned = True + job_model.skip_min_processing_interval = True await _mark_job_processed(session=session, job_model=job_model) return @@ -1059,6 +1063,7 @@ def _assign_instance_to_job( job_model.used_instance_id = instance_model.id job_model.job_provisioning_data = instance_model.job_provisioning_data job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() + job_model.skip_min_processing_interval = True switch_instance_status(session, instance_model, InstanceStatus.BUSY) instance_model.busy_blocks += offer.blocks @@ -1254,6 +1259,8 @@ async def _apply_existing_instance_provisioning( instance_model=instance_model, volume_attachment_result=provisioning.volume_attachment_result, ) + if context.job_model.status == JobStatus.PROVISIONING: + context.job_model.skip_min_processing_interval = True _release_replica_jobs_from_master_wait( job_model=context.job_model, replica_job_models=_get_job_models_by_ids( @@ -1495,6 +1502,7 @@ async def _promote_or_create_instance_models_for_provisioned_jobs( provisioned_job_model.fleet_id = fleet_model.id provisioned_job_model.job_provisioning_data = job_provisioning_data.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) + provisioned_job_model.skip_min_processing_interval = True # If a placeholder instance exists, promote it instead of creating a new one. # Safe to update the placeholder without locking: nobody else should update the placeholder. diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_24_0715_d9f6d27f0c41_add_jobmodel_skip_min_processing_.py b/src/dstack/_internal/server/migrations/versions/2026/04_24_0715_d9f6d27f0c41_add_jobmodel_skip_min_processing_.py new file mode 100644 index 000000000..927247077 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_24_0715_d9f6d27f0c41_add_jobmodel_skip_min_processing_.py @@ -0,0 +1,37 @@ +"""Add JobModel.skip_min_processing_interval + +Revision ID: d9f6d27f0c41 +Revises: 82b671d9c5ab +Create Date: 2026-04-24 07:15:00.000000+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d9f6d27f0c41" +down_revision = "82b671d9c5ab" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "skip_min_processing_interval", + sa.Boolean(), + server_default=sa.false(), + nullable=False, + ) + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.drop_column("skip_min_processing_interval") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index f8725a970..f76004ae6 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -526,6 +526,9 @@ class JobModel(PipelineModelMixin, BaseModel): submission_num: Mapped[int] = mapped_column(Integer) submitted_at: Mapped[datetime] = mapped_column(NaiveDateTime) last_processed_at: Mapped[datetime] = mapped_column(NaiveDateTime) + skip_min_processing_interval: Mapped[bool] = mapped_column( + Boolean, default=False, server_default=false() + ) status: Mapped[JobStatus] = mapped_column(EnumAsString(JobStatus, 100), index=True) """`status` must be changed only via `switch_job_status()`.""" termination_reason: Mapped[Optional[JobTerminationReason]] = mapped_column( From 8d1e06d6887196bddfe2922695a61db1f314c8a0 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 11:14:41 +0500 Subject: [PATCH 2/9] Prioritize running jobs processing with skip_min_processing_interval set --- .../server/background/pipeline_tasks/jobs_running.py | 8 ++++++-- .../server/background/pipeline_tasks/jobs_submitted.py | 1 + .../background/pipeline_tasks/test_submitted_jobs.py | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 84b46d3cf..e63e6dd5a 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -231,8 +231,12 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: and_( # Do not try to lock jobs if the run is waiting for the lock or terminating, # but allow retrying jobs whose own lock is stale because - # the run pipeline cannot reclaim stale job locks. - RunModel.lock_owner.is_(None), + # the run pipeline cannot reclaim stale job locks, and allow jobs with + # skip_min_processing_interval set to speed up provisioning. + or_( + RunModel.lock_owner.is_(None), + JobModel.skip_min_processing_interval == True, + ), RunModel.status.not_in([RunStatus.TERMINATING]), JobModel.lock_expires_at.is_(None), ), diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index f150af645..9bf59b8fd 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -328,6 +328,7 @@ async def process(self, item: JobSubmittedPipelineItem): item=item, provisioning=provisioning, ) + self._pipeline_hinter.hint_fetch(JobModel.__name__) return logger.debug("%s: assignment has started", fmt(context.job_model)) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 39c6c8c0f..cb705da5b 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -2,7 +2,7 @@ import uuid from datetime import timedelta from typing import cast -from unittest.mock import Mock, patch +from unittest.mock import Mock, call, patch import pytest from sqlalchemy import select @@ -652,7 +652,7 @@ async def test_defers_new_capacity_provisioning_until_fleet_master_is_elected( assert job.lock_token is None assert job.lock_expires_at is None hint_fetch = cast(Mock, worker._pipeline_hinter.hint_fetch) - hint_fetch.assert_called_once_with(FleetModel.__name__) + hint_fetch.assert_has_calls([call(FleetModel.__name__), call(JobModel.__name__)]) async def test_provisioning_non_master_job_ignores_cluster_master_fleet_lock( self, test_db, session: AsyncSession, worker: JobSubmittedWorker From 5e6bcae93e27b74957ba80423a898ca2cddc2244 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 11:51:02 +0500 Subject: [PATCH 3/9] Emit Graceful job stop requested event --- .../background/pipeline_tasks/jobs_terminating.py | 10 ++++++++++ .../background/pipeline_tasks/test_terminating_jobs.py | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 097c835e5..27a3fc454 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -318,6 +318,7 @@ class _ProcessResult: volume_update_rows: list[_VolumeUpdateRow] = field(default_factory=list) detached_volume_ids: set[uuid.UUID] = field(default_factory=set) unassign_event_message: Optional[str] = None + graceful_stop_event_message: Optional[str] = None replica_unregistration: Optional[_UnregisterReplicaResult] = ( None # None = not unregistered yet ) @@ -567,6 +568,14 @@ async def _apply_process_result( ], ) + if result.graceful_stop_event_message is not None and instance_model is not None: + events.emit( + session, + result.graceful_stop_event_message, + actor=events.SystemActor(), + targets=[events.Target.from_model(job_model)], + ) + if result.replica_unregistration is not None: targets = [events.Target.from_model(job_model)] if result.replica_unregistration.gateway_target is not None: @@ -631,6 +640,7 @@ async def _process_terminating_job( if job_model.graceful_termination_attempts == 0 and job_model.remove_at is None: result.job_update_map = await _stop_job_gracefully(job_model, instance_model) + result.graceful_stop_event_message = "Graceful job stop requested" return result jrd = get_job_runtime_data(job_model) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index 1d003dce0..d44f20769 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -289,6 +289,9 @@ async def test_stops_job_gracefully_before_terminating_container( assert instance.lock_expires_at is None assert instance.lock_owner is None + events = await list_events(session) + assert any(event.message == "Graceful job stop requested" for event in events) + async def test_terminates_gracefully_stopped_job_after_remove_at( self, test_db, session: AsyncSession, worker: JobTerminatingWorker ): From 5dbe870b3c147ad54652afb37c7b627b125731e3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 11:59:26 +0500 Subject: [PATCH 4/9] Hint RunPipeline from stop_runs --- src/dstack/_internal/server/routers/runs.py | 2 ++ src/dstack/_internal/server/services/runs/__init__.py | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/routers/runs.py b/src/dstack/_internal/server/routers/runs.py index acb24b4ee..cce118b6b 100644 --- a/src/dstack/_internal/server/routers/runs.py +++ b/src/dstack/_internal/server/routers/runs.py @@ -180,6 +180,7 @@ async def stop_runs( body: StopRunsRequest, session: AsyncSession = Depends(get_session), user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()), + pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter), ): """ Stop one or more runs. @@ -191,6 +192,7 @@ async def stop_runs( project=project, runs_names=body.runs_names, abort=body.abort, + pipeline_hinter=pipeline_hinter, ) diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index 5f5d97e47..6ebaa3bfe 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -696,6 +696,7 @@ async def stop_runs( project: ProjectModel, runs_names: List[str], abort: bool, + pipeline_hinter: Optional[PipelineHinterProtocol] = None, ): res = await session.execute( select(RunModel).where( @@ -716,7 +717,6 @@ async def stop_runs( .execution_options(populate_existing=True) ) run_models = res.scalars().all() - now = common_utils.get_current_datetime() for run_model in run_models: if run_model.status.is_finished(): continue @@ -727,9 +727,10 @@ async def stop_runs( switch_run_status( session, run_model, RunStatus.TERMINATING, actor=events.UserActor.from_user(user) ) - run_model.last_processed_at = now # The run will be terminated by RunPipeline. await session.commit() + if pipeline_hinter is not None: + pipeline_hinter.hint_fetch(RunModel.__name__) async def delete_runs( From 606ca5b8889ed2afa916e1fbb7d6500ea61a5778 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 12:24:06 +0500 Subject: [PATCH 5/9] Add skip_min_processing_interval for run stop --- .../pipeline_tasks/runs/__init__.py | 3 ++ ...fcb47_add_runmodel_skip_min_processing_.py | 37 +++++++++++++++++++ src/dstack/_internal/server/models.py | 3 ++ .../server/services/runs/__init__.py | 1 + .../pipeline_tasks/test_runs/test_pipeline.py | 17 +++++++++ 5 files changed, 61 insertions(+) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_27_0720_8c1f8f4fcb47_add_runmodel_skip_min_processing_.py diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py index 439f2814a..20b393844 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py @@ -185,6 +185,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]: <= now - self._min_processing_interval * 2, ), RunModel.last_processed_at == RunModel.submitted_at, + RunModel.skip_min_processing_interval == True, ), or_( RunModel.lock_expires_at.is_(None), @@ -204,6 +205,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]: RunModel.lock_token, RunModel.lock_expires_at, RunModel.status, + RunModel.skip_min_processing_interval, ) ) ) @@ -216,6 +218,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]: run_model.lock_expires_at = lock_expires_at run_model.lock_token = lock_token run_model.lock_owner = RunPipeline.__name__ + run_model.skip_min_processing_interval = False items.append( RunPipelineItem( __tablename__=RunModel.__tablename__, diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_27_0720_8c1f8f4fcb47_add_runmodel_skip_min_processing_.py b/src/dstack/_internal/server/migrations/versions/2026/04_27_0720_8c1f8f4fcb47_add_runmodel_skip_min_processing_.py new file mode 100644 index 000000000..a8f4abac0 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_27_0720_8c1f8f4fcb47_add_runmodel_skip_min_processing_.py @@ -0,0 +1,37 @@ +"""Add RunModel.skip_min_processing_interval + +Revision ID: 8c1f8f4fcb47 +Revises: d9f6d27f0c41 +Create Date: 2026-04-27 07:20:00.000000+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8c1f8f4fcb47" +down_revision = "d9f6d27f0c41" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "skip_min_processing_interval", + sa.Boolean(), + server_default=sa.false(), + nullable=False, + ) + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.drop_column("skip_min_processing_interval") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index f76004ae6..7fc99d29f 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -428,6 +428,9 @@ class RunModel(PipelineModelMixin, BaseModel): run_name: Mapped[str] = mapped_column(String(100)) submitted_at: Mapped[datetime] = mapped_column(NaiveDateTime) last_processed_at: Mapped[datetime] = mapped_column(NaiveDateTime) + skip_min_processing_interval: Mapped[bool] = mapped_column( + Boolean, default=False, server_default=false() + ) next_triggered_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) status: Mapped[RunStatus] = mapped_column(EnumAsString(RunStatus, 100), index=True) """`status` must be changed only via `switch_run_status()`.""" diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index 6ebaa3bfe..c32c3971e 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -727,6 +727,7 @@ async def stop_runs( switch_run_status( session, run_model, RunStatus.TERMINATING, actor=events.UserActor.from_user(user) ) + run_model.skip_min_processing_interval = True # The run will be terminated by RunPipeline. await session.commit() if pipeline_hinter is not None: diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_pipeline.py b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_pipeline.py index 51a27fcc1..feda67cf3 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_pipeline.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_pipeline.py @@ -106,6 +106,18 @@ async def test_fetch_selects_eligible_runs_and_sets_lock_fields( submitted_at=now, last_processed_at=now + dt.timedelta(seconds=10), ) + recent_terminating_skip = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_name="recent-terminating-skip", + status=RunStatus.TERMINATING, + submitted_at=now, + last_processed_at=now + dt.timedelta(seconds=9), + ) + recent_terminating_skip.skip_min_processing_interval = True + await session.commit() items = await fetcher.fetch(limit=10) @@ -115,6 +127,7 @@ async def test_fetch_selects_eligible_runs_and_sets_lock_fields( pending_retry.id, pending_scheduled_ready.id, pending_zero_scaled.id, + recent_terminating_skip.id, } assert {item.id: item.status for item in items} == { submitted.id: RunStatus.SUBMITTED, @@ -122,6 +135,7 @@ async def test_fetch_selects_eligible_runs_and_sets_lock_fields( pending_retry.id: RunStatus.PENDING, pending_scheduled_ready.id: RunStatus.PENDING, pending_zero_scaled.id: RunStatus.PENDING, + recent_terminating_skip.id: RunStatus.TERMINATING, } for run in [ @@ -133,6 +147,7 @@ async def test_fetch_selects_eligible_runs_and_sets_lock_fields( future_scheduled, finished, recent, + recent_terminating_skip, ]: await session.refresh(run) @@ -142,10 +157,12 @@ async def test_fetch_selects_eligible_runs_and_sets_lock_fields( pending_retry, pending_scheduled_ready, pending_zero_scaled, + recent_terminating_skip, ] assert all(run.lock_owner == RunPipeline.__name__ for run in fetched_runs) assert all(run.lock_expires_at is not None for run in fetched_runs) assert all(run.lock_token is not None for run in fetched_runs) + assert all(not run.skip_min_processing_interval for run in fetched_runs) assert len({run.lock_token for run in fetched_runs}) == 1 assert future_scheduled.lock_owner is None From f677f82c433a43f3b77bf2f530a119d941de1c09 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 13:53:54 +0500 Subject: [PATCH 6/9] Add skip_min_processing_interval for job termination --- .../background/pipeline_tasks/jobs_terminating.py | 3 +++ .../background/pipeline_tasks/runs/__init__.py | 1 + .../background/pipeline_tasks/runs/terminating.py | 3 +++ .../pipeline_tasks/test_runs/test_termination.py | 3 +++ .../pipeline_tasks/test_terminating_jobs.py | 14 +++++++++++++- 5 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 27a3fc454..902deaead 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -186,6 +186,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]: <= now - self._min_processing_interval * 2, JobModel.volumes_detached_at.is_not(None), ), + JobModel.skip_min_processing_interval == True, ), or_( JobModel.lock_expires_at.is_(None), @@ -205,6 +206,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]: JobModel.lock_token, JobModel.lock_expires_at, JobModel.volumes_detached_at, + JobModel.skip_min_processing_interval, ) ) ) @@ -217,6 +219,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]: job_model.lock_expires_at = lock_expires_at job_model.lock_token = lock_token job_model.lock_owner = JobTerminatingPipeline.__name__ + job_model.skip_min_processing_interval = False items.append( JobTerminatingPipelineItem( __tablename__=JobModel.__tablename__, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py index 20b393844..c26df9d4d 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/__init__.py @@ -268,6 +268,7 @@ async def process(self, item: RunPipelineItem): return if item.status == RunStatus.TERMINATING: await _process_terminating_item(item) + self._pipeline_hinter.hint_fetch(JobModel.__name__) return logger.error("Skipping run %s with unexpected status %s", item.id, item.status) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py index 4e657349e..eece7dfa7 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py @@ -36,6 +36,7 @@ class TerminatingRunJobUpdateMap(ItemUpdateMap, total=False): status: JobStatus termination_reason: Optional[JobTerminationReason] graceful_termination_attempts: int + skip_min_processing_interval: bool @dataclass @@ -114,12 +115,14 @@ def _get_job_id_to_update_map( job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap( status=JobStatus.TERMINATING, termination_reason=job_termination_reason, + skip_min_processing_interval=True, ) for job_id in delayed_job_ids: job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap( status=JobStatus.TERMINATING, termination_reason=job_termination_reason, graceful_termination_attempts=0, + skip_min_processing_interval=True, ) return job_id_to_update_map diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py index ab80677f7..69016a915 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py @@ -95,6 +95,7 @@ async def test_transitions_running_jobs_to_terminating( assert job.status == JobStatus.TERMINATING assert job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER assert job.graceful_termination_attempts == 0 + assert job.skip_min_processing_interval assert job.remove_at is None assert job.lock_token is None assert job.lock_expires_at is None @@ -147,10 +148,12 @@ async def test_updates_delayed_and_regular_jobs_separately( assert delayed_job.status == JobStatus.TERMINATING assert delayed_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER assert delayed_job.graceful_termination_attempts == 0 + assert delayed_job.skip_min_processing_interval assert delayed_job.remove_at is None assert regular_job.status == JobStatus.TERMINATING assert regular_job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER assert regular_job.graceful_termination_attempts is None + assert regular_job.skip_min_processing_interval assert regular_job.remove_at is None async def test_finishes_non_scheduled_run_when_all_jobs_are_finished( diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index d44f20769..6bc3a433b 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -129,6 +129,14 @@ async def test_fetch_selects_eligible_jobs_and_sets_lock_fields( submitted_at=stale - timedelta(minutes=2), last_processed_at=now, ) + recent_skip = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + submitted_at=stale - timedelta(minutes=2), + last_processed_at=now, + ) + recent_skip.skip_min_processing_interval = True locked = await create_job( session=session, @@ -159,11 +167,13 @@ async def test_fetch_selects_eligible_jobs_and_sets_lock_fields( terminating.id, past_remove_at.id, expired_same_owner.id, + recent_skip.id, ] assert {(item.id, item.volumes_detached_at) for item in items} == { (terminating.id, None), (past_remove_at.id, past_remove_at.volumes_detached_at), (expired_same_owner.id, None), + (recent_skip.id, None), } for job in [ @@ -172,15 +182,17 @@ async def test_fetch_selects_eligible_jobs_and_sets_lock_fields( future_remove_at, non_terminating, recent, + recent_skip, locked, expired_same_owner, ]: await session.refresh(job) - fetched_jobs = [terminating, past_remove_at, expired_same_owner] + fetched_jobs = [terminating, past_remove_at, expired_same_owner, recent_skip] assert all(job.lock_owner == JobTerminatingPipeline.__name__ for job in fetched_jobs) assert all(job.lock_expires_at is not None for job in fetched_jobs) assert all(job.lock_token is not None for job in fetched_jobs) + assert all(not job.skip_min_processing_interval for job in fetched_jobs) assert len({job.lock_token for job in fetched_jobs}) == 1 assert future_remove_at.lock_owner is None From f96ffc1005224408417a899f0e04fbebfd886d7c Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 14:22:36 +0500 Subject: [PATCH 7/9] Comment on hinting RunPipeline from JobTerminatingPipeline --- .../server/background/pipeline_tasks/jobs_terminating.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 902deaead..6daddb80e 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -283,6 +283,9 @@ async def process(self, item: JobTerminatingPipelineItem): instance_model=instance_model, result=result, ) + # TODO: Hint RunPipeline to quickly move run to TERMINATED. + # Currently not implemented since it also requires making run eligible for processing. + # (This pipeline cannot modify runs so it's not simple). class _JobUpdateMap(ItemUpdateMap, total=False): From be020733edad8332153354a11933091683d94460 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 14:37:19 +0500 Subject: [PATCH 8/9] Add skip_min_processing_interval for job-instance termination --- .../pipeline_tasks/instances/__init__.py | 3 ++ .../pipeline_tasks/jobs_terminating.py | 14 ++++++- ..._add_instancemodel_skip_min_processing_.py | 37 +++++++++++++++++++ src/dstack/_internal/server/models.py | 3 ++ 4 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_27_1030_05c351d08f6b_add_instancemodel_skip_min_processing_.py diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py index 5b1ce5a26..343b05f81 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py @@ -207,6 +207,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: <= now - self._min_processing_interval * 2, ), InstanceModel.last_processed_at == InstanceModel.created_at, + InstanceModel.skip_min_processing_interval == True, ), or_( and_( @@ -235,6 +236,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: InstanceModel.lock_token, InstanceModel.lock_expires_at, InstanceModel.status, + InstanceModel.skip_min_processing_interval, ) ) ) @@ -247,6 +249,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: instance_model.lock_expires_at = lock_expires_at instance_model.lock_token = lock_token instance_model.lock_owner = InstancePipeline.__name__ + instance_model.skip_min_processing_interval = False items.append( InstancePipelineItem( __tablename__=InstanceModel.__tablename__, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 6daddb80e..e15c24db5 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -283,6 +283,11 @@ async def process(self, item: JobTerminatingPipelineItem): instance_model=instance_model, result=result, ) + if ( + result.instance_update_map is not None + and result.instance_update_map.get("status") == InstanceStatus.TERMINATING + ): + self._pipeline_hinter.hint_fetch(InstanceModel.__name__) # TODO: Hint RunPipeline to quickly move run to TERMINATED. # Currently not implemented since it also requires making run eligible for processing. # (This pipeline cannot modify runs so it's not simple). @@ -305,6 +310,7 @@ class _InstanceUpdateMap(ItemUpdateMap, total=False): termination_reason_message: Optional[str] busy_blocks: int last_job_processed_at: UpdateMapDateTime + skip_min_processing_interval: bool class _VolumeUpdateRow(TypedDict): @@ -637,7 +643,9 @@ async def _process_terminating_job( # Placeholder has no VM and no provisioning data. Skip graceful stop, # container stop, and volume detach. instance_update_map = get_or_error(result.instance_update_map) - instance_update_map["status"] = InstanceStatus.TERMINATING + if instance_model.status != InstanceStatus.TERMINATING: + instance_update_map["status"] = InstanceStatus.TERMINATING + instance_update_map["skip_min_processing_interval"] = True instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED result.job_update_map["instance_id"] = None await _unregister_replica_and_update_result(result=result, job_model=job_model) @@ -682,7 +690,9 @@ async def _process_terminating_job( if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized: if instance_model.status not in InstanceStatus.finished_statuses(): instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED - instance_update_map["status"] = InstanceStatus.TERMINATING + if instance_model.status != InstanceStatus.TERMINATING: + instance_update_map["status"] = InstanceStatus.TERMINATING + instance_update_map["skip_min_processing_interval"] = True elif not [j for j in instance_model.jobs if j.id != job_model.id]: instance_update_map["status"] = InstanceStatus.IDLE diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_27_1030_05c351d08f6b_add_instancemodel_skip_min_processing_.py b/src/dstack/_internal/server/migrations/versions/2026/04_27_1030_05c351d08f6b_add_instancemodel_skip_min_processing_.py new file mode 100644 index 000000000..d44ee41d5 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_27_1030_05c351d08f6b_add_instancemodel_skip_min_processing_.py @@ -0,0 +1,37 @@ +"""Add InstanceModel.skip_min_processing_interval + +Revision ID: 05c351d08f6b +Revises: 8c1f8f4fcb47 +Create Date: 2026-04-27 10:30:00.000000+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "05c351d08f6b" +down_revision = "8c1f8f4fcb47" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "skip_min_processing_interval", + sa.Boolean(), + server_default=sa.false(), + nullable=False, + ) + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("skip_min_processing_interval") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 7fc99d29f..565aec51f 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -747,6 +747,9 @@ class InstanceModel(PipelineModelMixin, BaseModel): last_processed_at: Mapped[datetime] = mapped_column( NaiveDateTime, default=get_current_datetime ) + skip_min_processing_interval: Mapped[bool] = mapped_column( + Boolean, default=False, server_default=false() + ) deleted: Mapped[bool] = mapped_column(Boolean, default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) From 612aa377fcf24328db64a94b8c5c807ef27fd05c Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 27 Apr 2026 14:51:15 +0500 Subject: [PATCH 9/9] Add skip_min_processing_interval for instances termination --- src/dstack/_internal/server/routers/fleets.py | 4 ++++ src/dstack/_internal/server/services/fleets.py | 18 +++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/routers/fleets.py b/src/dstack/_internal/server/routers/fleets.py index 4aea9a03c..0f2c3bcab 100644 --- a/src/dstack/_internal/server/routers/fleets.py +++ b/src/dstack/_internal/server/routers/fleets.py @@ -200,6 +200,7 @@ async def delete_fleets( body: DeleteFleetsRequest, session: AsyncSession = Depends(get_session), user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()), + pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter), ): """ Deletes one or more fleets. @@ -210,6 +211,7 @@ async def delete_fleets( project=project, user=user, names=body.names, + pipeline_hinter=pipeline_hinter, ) @@ -218,6 +220,7 @@ async def delete_fleet_instances( body: DeleteFleetInstancesRequest, session: AsyncSession = Depends(get_session), user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()), + pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter), ): """ Deletes one or more instances within the fleet. @@ -229,4 +232,5 @@ async def delete_fleet_instances( user=user, names=[body.name], instance_nums=body.instance_nums, + pipeline_hinter=pipeline_hinter, ) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index f6d9c99d6..72633d8ce 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -754,6 +754,7 @@ async def delete_fleets( user: UserModel, names: List[str], instance_nums: Optional[List[int]] = None, + pipeline_hinter: Optional[PipelineHinterProtocol] = None, ): res = await session.execute( select(FleetModel.id) @@ -856,9 +857,13 @@ async def delete_fleets( logger.info( "Deleting fleets %s instances %s", [f.name for f in fleet_models], instance_nums ) + hint_instance_pipeline = False for fleet_model in fleet_models: - _terminate_fleet_instances( - session=session, fleet_model=fleet_model, instance_nums=instance_nums, actor=user + hint_instance_pipeline |= _terminate_fleet_instances( + session=session, + fleet_model=fleet_model, + instance_nums=instance_nums, + actor=user, ) # TERMINATING fleets are deleted by FleetPipeline after instances are terminated if instance_nums is None: @@ -869,6 +874,8 @@ async def delete_fleets( actor=events.UserActor.from_user(user), ) await session.commit() + if hint_instance_pipeline and pipeline_hinter is not None: + pipeline_hinter.hint_fetch(InstanceModel.__name__) def fleet_model_to_fleet( @@ -1431,7 +1438,8 @@ def _terminate_fleet_instances( fleet_model: FleetModel, instance_nums: Optional[List[int]], actor: UserModel, -): +) -> bool: + hint_instance_pipeline = False if is_fleet_in_use(fleet_model, instance_nums=instance_nums): if instance_nums is not None: raise ServerClientError( @@ -1447,9 +1455,13 @@ def _terminate_fleet_instances( instance.deleted = True else: instance.termination_reason = InstanceTerminationReason.TERMINATED_BY_USER + if instance.status != InstanceStatus.TERMINATING: + instance.skip_min_processing_interval = True + hint_instance_pipeline = True switch_instance_status( session, instance, InstanceStatus.TERMINATING, actor=events.UserActor.from_user(actor), ) + return hint_instance_pipeline