Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
<= now - self._min_processing_interval * 2,
),
InstanceModel.last_processed_at == InstanceModel.created_at,
InstanceModel.skip_min_processing_interval == True,
),
or_(
and_(
Expand Down Expand Up @@ -235,6 +236,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
InstanceModel.lock_token,
InstanceModel.lock_expires_at,
InstanceModel.status,
InstanceModel.skip_min_processing_interval,
)
)
)
Expand All @@ -247,6 +249,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
instance_model.lock_expires_at = lock_expires_at
instance_model.lock_token = lock_token
instance_model.lock_owner = InstancePipeline.__name__
instance_model.skip_min_processing_interval = False
items.append(
InstancePipelineItem(
__tablename__=InstanceModel.__tablename__,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,18 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
JobModel.last_processed_at
<= now - self._min_processing_interval * 2,
),
JobModel.skip_min_processing_interval == True,
),
or_(
and_(
# Do not try to lock jobs if the run is waiting for the lock or terminating,
# but allow retrying jobs whose own lock is stale because
# the run pipeline cannot reclaim stale job locks.
RunModel.lock_owner.is_(None),
# the run pipeline cannot reclaim stale job locks, and allow jobs with
# skip_min_processing_interval set to speed up provisioning.
or_(
RunModel.lock_owner.is_(None),
JobModel.skip_min_processing_interval == True,
),
RunModel.status.not_in([RunStatus.TERMINATING]),
JobModel.lock_expires_at.is_(None),
),
Expand All @@ -252,6 +257,7 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
JobModel.lock_expires_at,
JobModel.status,
JobModel.replica_num,
JobModel.skip_min_processing_interval,
)
)
)
Expand All @@ -264,6 +270,7 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
job_model.lock_expires_at = lock_expires_at
job_model.lock_token = lock_token
job_model.lock_owner = JobRunningPipeline.__name__
job_model.skip_min_processing_interval = False
items.append(
JobRunningPipelineItem(
__tablename__=JobModel.__tablename__,
Expand Down Expand Up @@ -339,6 +346,7 @@ class _JobUpdateMap(ItemUpdateMap, total=False):
exit_status: Optional[int]
registered: bool
image_pull_progress: Optional[str]
skip_min_processing_interval: bool


@dataclass
Expand Down Expand Up @@ -643,6 +651,7 @@ async def _process_provisioning_status(
)
if success:
_set_job_status(context.job_model, result, JobStatus.PULLING)
result.job_update_map["skip_min_processing_interval"] = True
return
else:
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
RunModel.fleet_id.is_not(None),
),
or_(
JobModel.skip_min_processing_interval == True,
JobModel.last_processed_at <= now - self._min_processing_interval,
JobModel.last_processed_at == JobModel.submitted_at,
),
Expand All @@ -268,6 +269,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
JobModel.id,
JobModel.lock_token,
JobModel.lock_expires_at,
JobModel.skip_min_processing_interval,
)
)
)
Expand All @@ -280,6 +282,7 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
job_model.lock_expires_at = lock_expires_at
job_model.lock_token = lock_token
job_model.lock_owner = JobSubmittedPipeline.__name__
job_model.skip_min_processing_interval = False
items.append(
JobSubmittedPipelineItem(
__tablename__=JobModel.__tablename__,
Expand Down Expand Up @@ -325,6 +328,7 @@ async def process(self, item: JobSubmittedPipelineItem):
item=item,
provisioning=provisioning,
)
self._pipeline_hinter.hint_fetch(JobModel.__name__)
return

logger.debug("%s: assignment has started", fmt(context.job_model))
Expand Down Expand Up @@ -607,6 +611,7 @@ async def _apply_assignment_result(
)
job_model.fleet_id = assignment.fleet_id
job_model.instance_assigned = True
job_model.skip_min_processing_interval = True
await _mark_job_processed(session=session, job_model=job_model)
return

Expand Down Expand Up @@ -1059,6 +1064,7 @@ def _assign_instance_to_job(
job_model.used_instance_id = instance_model.id
job_model.job_provisioning_data = instance_model.job_provisioning_data
job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json()
job_model.skip_min_processing_interval = True

switch_instance_status(session, instance_model, InstanceStatus.BUSY)
instance_model.busy_blocks += offer.blocks
Expand Down Expand Up @@ -1254,6 +1260,8 @@ async def _apply_existing_instance_provisioning(
instance_model=instance_model,
volume_attachment_result=provisioning.volume_attachment_result,
)
if context.job_model.status == JobStatus.PROVISIONING:
context.job_model.skip_min_processing_interval = True
_release_replica_jobs_from_master_wait(
job_model=context.job_model,
replica_job_models=_get_job_models_by_ids(
Expand Down Expand Up @@ -1495,6 +1503,7 @@ async def _promote_or_create_instance_models_for_provisioned_jobs(
provisioned_job_model.fleet_id = fleet_model.id
provisioned_job_model.job_provisioning_data = job_provisioning_data.json()
switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING)
provisioned_job_model.skip_min_processing_interval = True

# If a placeholder instance exists, promote it instead of creating a new one.
# Safe to update the placeholder without locking: nobody else should update the placeholder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]:
<= now - self._min_processing_interval * 2,
JobModel.volumes_detached_at.is_not(None),
),
JobModel.skip_min_processing_interval == True,
),
or_(
JobModel.lock_expires_at.is_(None),
Expand All @@ -205,6 +206,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]:
JobModel.lock_token,
JobModel.lock_expires_at,
JobModel.volumes_detached_at,
JobModel.skip_min_processing_interval,
)
)
)
Expand All @@ -217,6 +219,7 @@ async def fetch(self, limit: int) -> list[JobTerminatingPipelineItem]:
job_model.lock_expires_at = lock_expires_at
job_model.lock_token = lock_token
job_model.lock_owner = JobTerminatingPipeline.__name__
job_model.skip_min_processing_interval = False
items.append(
JobTerminatingPipelineItem(
__tablename__=JobModel.__tablename__,
Expand Down Expand Up @@ -280,6 +283,14 @@ async def process(self, item: JobTerminatingPipelineItem):
instance_model=instance_model,
result=result,
)
if (
result.instance_update_map is not None
and result.instance_update_map.get("status") == InstanceStatus.TERMINATING
):
self._pipeline_hinter.hint_fetch(InstanceModel.__name__)
# TODO: Hint RunPipeline to quickly move run to TERMINATED.
# Currently not implemented since it also requires making run eligible for processing.
# (This pipeline cannot modify runs so it's not simple).


class _JobUpdateMap(ItemUpdateMap, total=False):
Expand All @@ -299,6 +310,7 @@ class _InstanceUpdateMap(ItemUpdateMap, total=False):
termination_reason_message: Optional[str]
busy_blocks: int
last_job_processed_at: UpdateMapDateTime
skip_min_processing_interval: bool


class _VolumeUpdateRow(TypedDict):
Expand All @@ -318,6 +330,7 @@ class _ProcessResult:
volume_update_rows: list[_VolumeUpdateRow] = field(default_factory=list)
detached_volume_ids: set[uuid.UUID] = field(default_factory=set)
unassign_event_message: Optional[str] = None
graceful_stop_event_message: Optional[str] = None
replica_unregistration: Optional[_UnregisterReplicaResult] = (
None # None = not unregistered yet
)
Expand Down Expand Up @@ -567,6 +580,14 @@ async def _apply_process_result(
],
)

if result.graceful_stop_event_message is not None and instance_model is not None:
events.emit(
session,
result.graceful_stop_event_message,
actor=events.SystemActor(),
targets=[events.Target.from_model(job_model)],
)

if result.replica_unregistration is not None:
targets = [events.Target.from_model(job_model)]
if result.replica_unregistration.gateway_target is not None:
Expand Down Expand Up @@ -622,7 +643,9 @@ async def _process_terminating_job(
# Placeholder has no VM and no provisioning data. Skip graceful stop,
# container stop, and volume detach.
instance_update_map = get_or_error(result.instance_update_map)
instance_update_map["status"] = InstanceStatus.TERMINATING
if instance_model.status != InstanceStatus.TERMINATING:
instance_update_map["status"] = InstanceStatus.TERMINATING
instance_update_map["skip_min_processing_interval"] = True
instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED
result.job_update_map["instance_id"] = None
await _unregister_replica_and_update_result(result=result, job_model=job_model)
Expand All @@ -631,6 +654,7 @@ async def _process_terminating_job(

if job_model.graceful_termination_attempts == 0 and job_model.remove_at is None:
result.job_update_map = await _stop_job_gracefully(job_model, instance_model)
result.graceful_stop_event_message = "Graceful job stop requested"
return result

jrd = get_job_runtime_data(job_model)
Expand Down Expand Up @@ -666,7 +690,9 @@ async def _process_terminating_job(
if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized:
if instance_model.status not in InstanceStatus.finished_statuses():
instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED
instance_update_map["status"] = InstanceStatus.TERMINATING
if instance_model.status != InstanceStatus.TERMINATING:
instance_update_map["status"] = InstanceStatus.TERMINATING
instance_update_map["skip_min_processing_interval"] = True
elif not [j for j in instance_model.jobs if j.id != job_model.id]:
instance_update_map["status"] = InstanceStatus.IDLE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]:
<= now - self._min_processing_interval * 2,
),
RunModel.last_processed_at == RunModel.submitted_at,
RunModel.skip_min_processing_interval == True,
),
or_(
RunModel.lock_expires_at.is_(None),
Expand All @@ -204,6 +205,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]:
RunModel.lock_token,
RunModel.lock_expires_at,
RunModel.status,
RunModel.skip_min_processing_interval,
)
)
)
Expand All @@ -216,6 +218,7 @@ async def fetch(self, limit: int) -> list[RunPipelineItem]:
run_model.lock_expires_at = lock_expires_at
run_model.lock_token = lock_token
run_model.lock_owner = RunPipeline.__name__
run_model.skip_min_processing_interval = False
items.append(
RunPipelineItem(
__tablename__=RunModel.__tablename__,
Expand Down Expand Up @@ -265,6 +268,7 @@ async def process(self, item: RunPipelineItem):
return
if item.status == RunStatus.TERMINATING:
await _process_terminating_item(item)
self._pipeline_hinter.hint_fetch(JobModel.__name__)
return

logger.error("Skipping run %s with unexpected status %s", item.id, item.status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class TerminatingRunJobUpdateMap(ItemUpdateMap, total=False):
status: JobStatus
termination_reason: Optional[JobTerminationReason]
graceful_termination_attempts: int
skip_min_processing_interval: bool


@dataclass
Expand Down Expand Up @@ -114,12 +115,14 @@ def _get_job_id_to_update_map(
job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap(
status=JobStatus.TERMINATING,
termination_reason=job_termination_reason,
skip_min_processing_interval=True,
)
for job_id in delayed_job_ids:
job_id_to_update_map[job_id] = TerminatingRunJobUpdateMap(
status=JobStatus.TERMINATING,
termination_reason=job_termination_reason,
graceful_termination_attempts=0,
skip_min_processing_interval=True,
)
return job_id_to_update_map

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Add JobModel.skip_min_processing_interval

Revision ID: d9f6d27f0c41
Revises: 82b671d9c5ab
Create Date: 2026-04-24 07:15:00.000000+00:00

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "d9f6d27f0c41"
down_revision = "82b671d9c5ab"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"skip_min_processing_interval",
sa.Boolean(),
server_default=sa.false(),
nullable=False,
)
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.drop_column("skip_min_processing_interval")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Add RunModel.skip_min_processing_interval

Revision ID: 8c1f8f4fcb47
Revises: d9f6d27f0c41
Create Date: 2026-04-27 07:20:00.000000+00:00

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8c1f8f4fcb47"
down_revision = "d9f6d27f0c41"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"skip_min_processing_interval",
sa.Boolean(),
server_default=sa.false(),
nullable=False,
)
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.drop_column("skip_min_processing_interval")
# ### end Alembic commands ###
Loading
Loading