From 562e88ab3196c03432fc8c5ee6eb295d235529e9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 7 Mar 2026 22:03:51 +0000 Subject: [PATCH 1/6] feat(cli): auto-cancel dev runs on CLI exit via detached watchdog When the dev CLI exits (e.g. ctrl+c via pnpm), runs that were mid-execution previously stayed stuck in EXECUTING status for up to 5 minutes until the heartbeat timeout fired. Now they are cancelled within seconds. The dev CLI spawns a lightweight detached watchdog process at startup. The watchdog monitors the CLI process ID and, when it detects the CLI has exited, calls a new POST /engine/v1/dev/disconnect endpoint to cancel all in-flight runs immediately (skipping PENDING_CANCEL since the worker is known to be dead). Watchdog design: - Fully detached (detached: true, stdio: ignore, unref()) so it survives even when pnpm sends SIGKILL to the process tree - Active run IDs maintained via atomic file write (.trigger/active-runs.json) - Single-instance guarantee via PID file (.trigger/watchdog.pid) - Safety timeout: exits after 24 hours to prevent zombie processes - On clean shutdown, the watchdog is killed (no disconnect needed) Disconnect endpoint: - Rate-limited: 5 calls/min per environment - Capped at 500 runs per call - Small counts (<= 25): cancelled inline with pMap concurrency 10 - Large counts: delegated to the bulk action system - Uses finalizeRun: true to skip PENDING_CANCEL and go straight to FINISHED Run engine change: - cancelRun() now respects finalizeRun when the run is in EXECUTING status, skipping the PENDING_CANCEL waiting state and going directly to FINISHED --- .changeset/strange-moles-provide.md | 6 + .server-changes/dev-cli-disconnect-md | 6 + .../app/routes/engine.v1.dev.disconnect.ts | 169 ++++++++++++++++++ .../app/v3/services/cancelTaskRun.server.ts | 3 + .../src/engine/systems/runAttemptSystem.ts | 52 +++--- packages/cli-v3/src/apiClient.ts | 20 +++ packages/cli-v3/src/dev/devSupervisor.ts | 116 +++++++++++- packages/cli-v3/src/dev/devWatchdog.ts | 155 ++++++++++++++++ packages/core/src/v3/schemas/api.ts | 11 ++ 9 files changed, 512 insertions(+), 26 deletions(-) create mode 100644 .changeset/strange-moles-provide.md create mode 100644 .server-changes/dev-cli-disconnect-md create mode 100644 apps/webapp/app/routes/engine.v1.dev.disconnect.ts create mode 100644 packages/cli-v3/src/dev/devWatchdog.ts diff --git a/.changeset/strange-moles-provide.md b/.changeset/strange-moles-provide.md new file mode 100644 index 00000000000..3ad85ea0739 --- /dev/null +++ b/.changeset/strange-moles-provide.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL diff --git a/.server-changes/dev-cli-disconnect-md b/.server-changes/dev-cli-disconnect-md new file mode 100644 index 00000000000..a0790d70765 --- /dev/null +++ b/.server-changes/dev-cli-disconnect-md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel. \ No newline at end of file diff --git a/apps/webapp/app/routes/engine.v1.dev.disconnect.ts b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts new file mode 100644 index 00000000000..4658700ad97 --- /dev/null +++ b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts @@ -0,0 +1,169 @@ +import { json } from "@remix-run/server-runtime"; +import { Ratelimit } from "@upstash/ratelimit"; +import { tryCatch } from "@trigger.dev/core"; +import { DevDisconnectRequestBody } from "@trigger.dev/core/v3"; +import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; +import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { RateLimiter } from "~/services/rateLimiter.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { commonWorker } from "~/v3/commonWorker.server"; +import pMap from "p-map"; + +const CANCEL_REASON = "Dev session ended (CLI exited)"; + +// Below this threshold, cancel runs inline with pMap. +// Above it, create a bulk action and process asynchronously. +const BULK_ACTION_THRESHOLD = 25; + +// Maximum number of runs that can be cancelled in a single disconnect call. +const MAX_RUNS = 500; + +// Rate limit: 5 calls per minute per environment +const disconnectRateLimiter = new RateLimiter({ + keyPrefix: "dev-disconnect", + limiter: Ratelimit.fixedWindow(5, "1 m"), + logFailure: true, +}); + +const { action } = createActionApiRoute( + { + body: DevDisconnectRequestBody, + maxContentLength: 1024 * 256, // 256KB + method: "POST", + }, + async ({ authentication, body }) => { + const environmentId = authentication.environment.id; + + // Rate limit per environment + const rateLimitResult = await disconnectRateLimiter.limit(environmentId); + if (!rateLimitResult.success) { + return json( + { error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) }, + { status: 429 } + ); + } + + // Cap the number of runs + const runFriendlyIds = body.runFriendlyIds.slice(0, MAX_RUNS); + + if (runFriendlyIds.length === 0) { + return json({ cancelled: 0 }, { status: 200 }); + } + + logger.info("Dev disconnect: cancelling runs", { + environmentId, + runCount: runFriendlyIds.length, + capped: body.runFriendlyIds.length > MAX_RUNS, + }); + + // For small numbers of runs, cancel inline + if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) { + const cancelled = await cancelRunsInline(runFriendlyIds, environmentId); + return json({ cancelled }, { status: 200 }); + } + + // For large numbers, create a bulk action to process asynchronously + const bulkActionId = await createBulkCancelAction( + runFriendlyIds, + authentication.environment.project.id, + environmentId + ); + + logger.info("Dev disconnect: created bulk action for large run set", { + environmentId, + bulkActionId, + runCount: runFriendlyIds.length, + }); + + return json({ cancelled: 0, bulkActionId }, { status: 200 }); + } +); + +async function cancelRunsInline( + runFriendlyIds: string[], + environmentId: string +): Promise { + const runIds = runFriendlyIds.map((fid) => RunId.toId(fid)); + + const runs = await prisma.taskRun.findMany({ + where: { + id: { in: runIds }, + runtimeEnvironmentId: environmentId, + }, + select: { + id: true, + engine: true, + friendlyId: true, + status: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + }, + }); + + let cancelled = 0; + const cancelService = new CancelTaskRunService(prisma); + + await pMap( + runs, + async (run) => { + const [error, result] = await tryCatch( + cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true }) + ); + + if (error) { + logger.error("Dev disconnect: failed to cancel run", { + runId: run.id, + error, + }); + } else if (result && !result.alreadyFinished) { + cancelled++; + } + }, + { concurrency: 10 } + ); + + logger.info("Dev disconnect: completed inline cancellation", { + environmentId, + cancelled, + total: runFriendlyIds.length, + }); + + return cancelled; +} + +async function createBulkCancelAction( + runFriendlyIds: string[], + projectId: string, + environmentId: string +): Promise { + const { id, friendlyId } = BulkActionId.generate(); + + await prisma.bulkActionGroup.create({ + data: { + id, + friendlyId, + projectId, + environmentId, + name: "Dev session disconnect", + type: BulkActionType.CANCEL, + params: { runId: runFriendlyIds }, + queryName: "bulk_action_v1", + totalCount: runFriendlyIds.length, + completionNotification: BulkActionNotificationType.NONE, + }, + }); + + await commonWorker.enqueue({ + id: `processBulkAction-${id}`, + job: "processBulkAction", + payload: { bulkActionId: id }, + }); + + return friendlyId; +} + +export { action }; diff --git a/apps/webapp/app/v3/services/cancelTaskRun.server.ts b/apps/webapp/app/v3/services/cancelTaskRun.server.ts index 0c27a0b957f..76db56535ae 100644 --- a/apps/webapp/app/v3/services/cancelTaskRun.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRun.server.ts @@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = { cancelAttempts?: boolean; cancelledAt?: Date; bulkActionId?: string; + /** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */ + finalizeRun?: boolean; }; type CancelTaskRunServiceResult = { @@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService { runId: taskRun.id, completedAt: options?.cancelledAt, reason: options?.reason, + finalizeRun: options?.finalizeRun, bulkActionId: options?.bulkActionId, tx: this._prisma, }); diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 2d10e756b5b..067d00a14e0 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -1436,35 +1436,39 @@ export class RunAttemptSystem { }); //if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status + //unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED if ( isExecuting(latestSnapshot.executionStatus) || isPendingExecuting(latestSnapshot.executionStatus) ) { - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { - run, - snapshot: { - executionStatus: "PENDING_CANCEL", - description: "Run was cancelled", - }, - previousSnapshotId: latestSnapshot.id, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - workerId, - runnerId, - }); + if (!finalizeRun) { + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { + run, + snapshot: { + executionStatus: "PENDING_CANCEL", + description: "Run was cancelled", + }, + previousSnapshotId: latestSnapshot.id, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + workerId, + runnerId, + }); - //the worker needs to be notified so it can kill the run and complete the attempt - await sendNotificationToWorker({ - runId, - snapshot: newSnapshot, - eventBus: this.$.eventBus, - }); - return { - alreadyFinished: false, - ...executionResultFromSnapshot(newSnapshot), - }; + //the worker needs to be notified so it can kill the run and complete the attempt + await sendNotificationToWorker({ + runId, + snapshot: newSnapshot, + eventBus: this.$.eventBus, + }); + return { + alreadyFinished: false, + ...executionResultFromSnapshot(newSnapshot), + }; + } + // finalizeRun is true — fall through to finish the run immediately } //not executing, so we will actually finish the run diff --git a/packages/cli-v3/src/apiClient.ts b/packages/cli-v3/src/apiClient.ts index e099260203f..693b48d992e 100644 --- a/packages/cli-v3/src/apiClient.ts +++ b/packages/cli-v3/src/apiClient.ts @@ -7,6 +7,8 @@ import { DevConfigResponseBody, DevDequeueRequestBody, DevDequeueResponseBody, + DevDisconnectRequestBody, + DevDisconnectResponseBody, EnvironmentVariableResponseBody, FailDeploymentRequestBody, FailDeploymentResponseBody, @@ -557,6 +559,7 @@ export class CliApiClient { heartbeatRun: this.devHeartbeatRun.bind(this), startRunAttempt: this.devStartRunAttempt.bind(this), completeRunAttempt: this.devCompleteRunAttempt.bind(this), + disconnect: this.devDisconnect.bind(this), setEngineURL: this.setEngineURL.bind(this), } as const; } @@ -681,6 +684,23 @@ export class CliApiClient { return eventSource; } + private async devDisconnect( + body: DevDisconnectRequestBody + ): Promise> { + if (!this.accessToken) { + throw new Error("devDisconnect: No access token"); + } + + return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, { + method: "POST", + headers: { + Authorization: `Bearer ${this.accessToken}`, + Accept: "application/json", + }, + body: JSON.stringify(body), + }); + } + private async devDequeue( body: DevDequeueRequestBody ): Promise> { diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index 67da7e59458..a0c8afd2561 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -1,3 +1,7 @@ +import { spawn, type ChildProcess } from "node:child_process"; +import { readFileSync, writeFileSync, renameSync, unlinkSync, existsSync, mkdirSync } from "node:fs"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; import { setTimeout as awaitTimeout } from "node:timers/promises"; import { BuildManifest, @@ -71,6 +75,11 @@ class DevSupervisor implements WorkerRuntime { private runLimiter?: ReturnType; private taskRunProcessPool?: TaskRunProcessPool; + /** Detached watchdog process that cancels runs if the CLI is killed */ + private watchdogProcess?: ChildProcess; + private activeRunsPath?: string; + private watchdogPidPath?: string; + constructor(public readonly options: WorkerRuntimeOptions) {} async init(): Promise { @@ -138,16 +147,22 @@ class DevSupervisor implements WorkerRuntime { //start an SSE connection for presence this.disconnectPresence = await this.#startPresenceConnection(); - // Handle SIGTERM to gracefully stop all run controllers + // Handle SIGTERM/SIGINT to gracefully stop all run controllers process.on("SIGTERM", this.#handleSigterm); + process.on("SIGINT", this.#handleSigterm); + + // Spawn detached watchdog to cancel runs if CLI is killed (e.g. pnpm SIGKILL) + this.#spawnWatchdog(); //start dequeuing await this.#dequeueRuns(); } #handleSigterm = async () => { - logger.debug("[DevSupervisor] Received SIGTERM, stopping all run controllers"); + logger.debug("[DevSupervisor] Received SIGTERM/SIGINT, stopping all run controllers"); + // The detached watchdog handles cancelling runs on the platform. + // Here we just stop local run controllers. const stopPromises = Array.from(this.runControllers.values()).map((controller) => controller.stop() ); @@ -157,6 +172,10 @@ class DevSupervisor implements WorkerRuntime { async shutdown(): Promise { process.off("SIGTERM", this.#handleSigterm); + process.off("SIGINT", this.#handleSigterm); + + // Kill watchdog on clean shutdown — no disconnect needed since runs are stopped locally + this.#killWatchdog(); this.disconnectPresence?.(); try { @@ -177,6 +196,97 @@ class DevSupervisor implements WorkerRuntime { } } + #spawnWatchdog() { + const triggerDir = join(this.options.config.workingDir, ".trigger"); + if (!existsSync(triggerDir)) { + mkdirSync(triggerDir, { recursive: true }); + } + + this.activeRunsPath = join(triggerDir, "active-runs.json"); + this.watchdogPidPath = join(triggerDir, "watchdog.pid"); + + // Write empty active-runs file + this.#updateActiveRunsFile(); + + // Resolve the compiled watchdog script path relative to this file + const thisDir = fileURLToPath(new URL(".", import.meta.url)); + const watchdogScript = join(thisDir, "devWatchdog.js"); + + if (!existsSync(watchdogScript)) { + logger.debug("[DevSupervisor] Watchdog script not found, skipping", { watchdogScript }); + return; + } + + try { + this.watchdogProcess = spawn(process.execPath, [watchdogScript], { + detached: true, + stdio: "ignore", + env: { + WATCHDOG_PARENT_PID: process.pid.toString(), + WATCHDOG_API_URL: this.options.client.apiURL, + WATCHDOG_API_KEY: this.options.client.accessToken ?? "", + WATCHDOG_ACTIVE_RUNS: this.activeRunsPath, + WATCHDOG_PID_FILE: this.watchdogPidPath, + }, + }); + + this.watchdogProcess.unref(); + + logger.debug("[DevSupervisor] Spawned watchdog", { + watchdogPid: this.watchdogProcess.pid, + parentPid: process.pid, + }); + } catch (error) { + logger.debug("[DevSupervisor] Failed to spawn watchdog", { error }); + } + } + + #killWatchdog() { + if (this.watchdogProcess?.pid) { + try { + process.kill(this.watchdogProcess.pid, "SIGTERM"); + } catch { + // Already dead + } + this.watchdogProcess = undefined; + } + + // Also try via PID file in case the process reference is stale + if (this.watchdogPidPath) { + try { + const pid = parseInt(readFileSync(this.watchdogPidPath, "utf8"), 10); + process.kill(pid, "SIGTERM"); + } catch { + // Already dead or no file + } + } + + // Clean up files + try { + if (this.activeRunsPath) unlinkSync(this.activeRunsPath); + } catch {} + try { + if (this.watchdogPidPath) unlinkSync(this.watchdogPidPath); + } catch {} + } + + #updateActiveRunsFile() { + if (!this.activeRunsPath) return; + + try { + const data = { + parentPid: process.pid, + runFriendlyIds: Array.from(this.runControllers.keys()), + }; + // Atomic write: write to temp file then rename to avoid corrupt reads + const tmpPath = this.activeRunsPath + ".tmp"; + writeFileSync(tmpPath, JSON.stringify(data)); + renameSync(tmpPath, this.activeRunsPath); + } catch (error) { + logger.debug("[DevSupervisor] Failed to update active-runs file", { error }); + } + } + async initializeWorker( manifest: BuildManifest, metafile: Metafile, @@ -386,6 +496,7 @@ class DevSupervisor implements WorkerRuntime { //stop the run controller, and remove it runController?.stop(); this.runControllers.delete(message.run.friendlyId); + this.#updateActiveRunsFile(); this.#unsubscribeFromRunNotifications(message.run.friendlyId); //stop the worker if it is deprecated and there are no more runs @@ -402,6 +513,7 @@ class DevSupervisor implements WorkerRuntime { }); this.runControllers.set(message.run.friendlyId, runController); + this.#updateActiveRunsFile(); if (this.runLimiter) { this.runLimiter(() => runController.start(message)).then(() => { diff --git a/packages/cli-v3/src/dev/devWatchdog.ts b/packages/cli-v3/src/dev/devWatchdog.ts new file mode 100644 index 00000000000..66dd197951b --- /dev/null +++ b/packages/cli-v3/src/dev/devWatchdog.ts @@ -0,0 +1,155 @@ +/** + * Dev Watchdog — a detached process that cancels in-flight runs when the dev CLI exits. + * + * Spawned by the dev CLI with `detached: true, stdio: "ignore", unref()`. + * Survives when pnpm sends SIGKILL to the CLI process tree. + * + * Lifecycle: + * 1. CLI spawns this script, passing config via env vars + * 2. Writes PID file for single-instance guarantee + * 3. Polls parent PID to detect when the CLI exits + * 4. On parent death: reads active-runs file → calls disconnect endpoint → exits + * + * Environment variables: + * WATCHDOG_PARENT_PID - The PID of the parent dev CLI process + * WATCHDOG_API_URL - The Trigger.dev API/engine URL + * WATCHDOG_API_KEY - The API key for authentication + * WATCHDOG_ACTIVE_RUNS - Path to the active-runs JSON file + * WATCHDOG_PID_FILE - Path to write the watchdog PID file + */ + +import { readFileSync, writeFileSync, unlinkSync, existsSync, mkdirSync } from "node:fs"; +import { dirname } from "node:path"; + +const POLL_INTERVAL_MS = 1000; + +// Safety timeout: if the watchdog has been running for 24 hours, exit regardless. +// Prevents zombie watchdogs from PID reuse scenarios. +const MAX_LIFETIME_MS = 24 * 60 * 60 * 1000; + +const parentPid = parseInt(process.env.WATCHDOG_PARENT_PID!, 10); +const apiUrl = process.env.WATCHDOG_API_URL!; +const apiKey = process.env.WATCHDOG_API_KEY!; +const activeRunsPath = process.env.WATCHDOG_ACTIVE_RUNS!; +const pidFilePath = process.env.WATCHDOG_PID_FILE!; + +if (!parentPid || !apiUrl || !apiKey || !activeRunsPath || !pidFilePath) { + process.exit(1); +} + +// Ensure directory exists +const dir = dirname(pidFilePath); +if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); +} + +// Single instance: kill any existing watchdog +try { + const existingPid = parseInt(readFileSync(pidFilePath, "utf8"), 10); + if (existingPid && existingPid !== process.pid) { + try { + process.kill(existingPid, 0); // Check if alive + process.kill(existingPid, "SIGTERM"); // Kill it + } catch { + // Already dead + } + } +} catch { + // No PID file +} + +// Write our PID +writeFileSync(pidFilePath, process.pid.toString()); + +function cleanup() { + try { + unlinkSync(pidFilePath); + } catch {} + try { + unlinkSync(activeRunsPath); + } catch {} +} + +function isParentAlive(): boolean { + try { + process.kill(parentPid, 0); + return true; + } catch { + return false; + } +} + +function readActiveRuns(): string[] { + try { + const data = JSON.parse(readFileSync(activeRunsPath, "utf8")); + return data.runFriendlyIds ?? []; + } catch { + return []; + } +} + +async function callDisconnect(runFriendlyIds: string[]): Promise { + try { + await fetch(`${apiUrl}/engine/v1/dev/disconnect`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + }, + body: JSON.stringify({ runFriendlyIds }), + signal: AbortSignal.timeout(10_000), + }); + } catch { + // Network error — runs will eventually time out via heartbeat + } +} + +async function onParentDied(): Promise { + const runFriendlyIds = readActiveRuns(); + + if (runFriendlyIds.length > 0) { + await callDisconnect(runFriendlyIds); + } + + cleanup(); + process.exit(0); +} + +// Guard against overlapping async callbacks +let checking = false; + +const interval = setInterval(async () => { + if (checking) return; + checking = true; + + try { + if (!isParentAlive()) { + clearInterval(interval); + clearTimeout(lifetimeTimeout); + await onParentDied(); + } + } finally { + checking = false; + } +}, POLL_INTERVAL_MS); + +// Safety timeout: exit after MAX_LIFETIME_MS to prevent zombie watchdogs +const lifetimeTimeout = setTimeout(() => { + clearInterval(interval); + cleanup(); + process.exit(0); +}, MAX_LIFETIME_MS); + +// Unref the timeout so it doesn't keep the process alive if the interval is cleared +lifetimeTimeout.unref(); + +// Clean exit on any termination signal +function handleSignal() { + clearInterval(interval); + clearTimeout(lifetimeTimeout); + cleanup(); + process.exit(0); +} + +process.on("SIGTERM", handleSignal); +process.on("SIGINT", handleSignal); diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index d42e6158096..dc72f155bdc 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -838,6 +838,17 @@ export const DevDequeueResponseBody = z.object({ }); export type DevDequeueResponseBody = z.infer; +export const DevDisconnectRequestBody = z.object({ + runFriendlyIds: z.string().array(), +}); +export type DevDisconnectRequestBody = z.infer; + +export const DevDisconnectResponseBody = z.object({ + cancelled: z.number(), + bulkActionId: z.string().optional(), +}); +export type DevDisconnectResponseBody = z.infer; + export type CreateUploadPayloadUrlResponseBody = z.infer; export const ReplayRunResponse = z.object({ From 07210123421d90c001f85f77379a4839b3c73cd7 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 7 Mar 2026 22:28:10 +0000 Subject: [PATCH 2/6] fix: address CodeRabbit review feedback - Reject requests exceeding 500 runs with 400 instead of silently truncating - Propagate finalizeRun through bulk action params so large batches also skip PENDING_CANCEL - Inherit parent process.env when spawning watchdog (TLS/proxy settings) - Add PID file prefix (trigger-watchdog:) to prevent killing unrelated processes on PID reuse --- .../app/routes/engine.v1.dev.disconnect.ts | 13 +++++++--- .../v3/services/bulk/BulkActionV2.server.ts | 5 +++- packages/cli-v3/src/dev/devSupervisor.ts | 9 +++++-- packages/cli-v3/src/dev/devWatchdog.ts | 25 +++++++++++-------- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/apps/webapp/app/routes/engine.v1.dev.disconnect.ts b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts index 4658700ad97..bd3eff5c45a 100644 --- a/apps/webapp/app/routes/engine.v1.dev.disconnect.ts +++ b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts @@ -46,8 +46,14 @@ const { action } = createActionApiRoute( ); } - // Cap the number of runs - const runFriendlyIds = body.runFriendlyIds.slice(0, MAX_RUNS); + if (body.runFriendlyIds.length > MAX_RUNS) { + return json( + { error: `A maximum of ${MAX_RUNS} runs can be cancelled per request` }, + { status: 400 } + ); + } + + const { runFriendlyIds } = body; if (runFriendlyIds.length === 0) { return json({ cancelled: 0 }, { status: 200 }); @@ -56,7 +62,6 @@ const { action } = createActionApiRoute( logger.info("Dev disconnect: cancelling runs", { environmentId, runCount: runFriendlyIds.length, - capped: body.runFriendlyIds.length > MAX_RUNS, }); // For small numbers of runs, cancel inline @@ -150,7 +155,7 @@ async function createBulkCancelAction( environmentId, name: "Dev session disconnect", type: BulkActionType.CANCEL, - params: { runId: runFriendlyIds }, + params: { runId: runFriendlyIds, finalizeRun: true }, queryName: "bulk_action_v1", totalCount: runFriendlyIds.length, completionNotification: BulkActionNotificationType.NONE, diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 98b6079c108..4ca558bc2b7 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -138,11 +138,13 @@ export class BulkActionService extends BaseService { } // 2. Parse the params + const rawParams = group.params && typeof group.params === "object" ? group.params : {}; + const finalizeRun = "finalizeRun" in rawParams && (rawParams as any).finalizeRun === true; const filters = parseRunListInputOptions({ organizationId: group.project.organizationId, projectId: group.projectId, environmentId: group.environmentId, - ...(group.params && typeof group.params === "object" ? group.params : {}), + ...rawParams, }); const runsRepository = new RunsRepository({ @@ -199,6 +201,7 @@ export class BulkActionService extends BaseService { cancelService.call(run, { reason: `Bulk action ${group.friendlyId} cancelled run`, bulkActionId: bulkActionId, + finalizeRun, }) ); if (error) { diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index a0c8afd2561..79d5df8b8e7 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -222,6 +222,7 @@ class DevSupervisor implements WorkerRuntime { detached: true, stdio: "ignore", env: { + ...process.env, WATCHDOG_PARENT_PID: process.pid.toString(), WATCHDOG_API_URL: this.options.client.apiURL, WATCHDOG_API_KEY: this.options.client.accessToken ?? "", @@ -254,8 +255,12 @@ class DevSupervisor implements WorkerRuntime { // Also try via PID file in case the process reference is stale if (this.watchdogPidPath) { try { - const pid = parseInt(readFileSync(this.watchdogPidPath, "utf8"), 10); - process.kill(pid, "SIGTERM"); + const content = readFileSync(this.watchdogPidPath, "utf8"); + const prefix = "trigger-watchdog:"; + if (content.startsWith(prefix)) { + const pid = parseInt(content.slice(prefix.length), 10); + if (pid) process.kill(pid, "SIGTERM"); + } } catch { // Already dead or no file } diff --git a/packages/cli-v3/src/dev/devWatchdog.ts b/packages/cli-v3/src/dev/devWatchdog.ts index 66dd197951b..80ff56bb045 100644 --- a/packages/cli-v3/src/dev/devWatchdog.ts +++ b/packages/cli-v3/src/dev/devWatchdog.ts @@ -43,23 +43,28 @@ if (!existsSync(dir)) { mkdirSync(dir, { recursive: true }); } +const PID_FILE_PREFIX = "trigger-watchdog:"; + // Single instance: kill any existing watchdog try { - const existingPid = parseInt(readFileSync(pidFilePath, "utf8"), 10); - if (existingPid && existingPid !== process.pid) { - try { - process.kill(existingPid, 0); // Check if alive - process.kill(existingPid, "SIGTERM"); // Kill it - } catch { - // Already dead + const pidFileContent = readFileSync(pidFilePath, "utf8"); + if (pidFileContent.startsWith(PID_FILE_PREFIX)) { + const existingPid = parseInt(pidFileContent.slice(PID_FILE_PREFIX.length), 10); + if (existingPid && existingPid !== process.pid) { + try { + process.kill(existingPid, 0); // Check if alive + process.kill(existingPid, "SIGTERM"); // Kill it + } catch { + // Already dead + } } } } catch { - // No PID file + // No PID file or invalid format } -// Write our PID -writeFileSync(pidFilePath, process.pid.toString()); +// Write our PID with prefix so we can verify ownership later +writeFileSync(pidFilePath, `${PID_FILE_PREFIX}${process.pid}`); function cleanup() { try { From e94add93411a3173ed064786ffd369539fb2e7bc Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 8 Mar 2026 18:51:01 +0000 Subject: [PATCH 3/6] fix(cli): check response.ok and retry disconnect with exponential backoff --- packages/cli-v3/src/dev/devWatchdog.ts | 40 +++++++++++++++++--------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/packages/cli-v3/src/dev/devWatchdog.ts b/packages/cli-v3/src/dev/devWatchdog.ts index 80ff56bb045..b3ccb782189 100644 --- a/packages/cli-v3/src/dev/devWatchdog.ts +++ b/packages/cli-v3/src/dev/devWatchdog.ts @@ -94,26 +94,40 @@ function readActiveRuns(): string[] { } async function callDisconnect(runFriendlyIds: string[]): Promise { - try { - await fetch(`${apiUrl}/engine/v1/dev/disconnect`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${apiKey}`, - }, - body: JSON.stringify({ runFriendlyIds }), - signal: AbortSignal.timeout(10_000), - }); - } catch { - // Network error — runs will eventually time out via heartbeat + const response = await fetch(`${apiUrl}/engine/v1/dev/disconnect`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + }, + body: JSON.stringify({ runFriendlyIds }), + signal: AbortSignal.timeout(10_000), + }); + + if (!response.ok) { + throw new Error(`Disconnect failed with status ${response.status}`); } } +const MAX_DISCONNECT_ATTEMPTS = 5; +const INITIAL_BACKOFF_MS = 500; + async function onParentDied(): Promise { const runFriendlyIds = readActiveRuns(); if (runFriendlyIds.length > 0) { - await callDisconnect(runFriendlyIds); + for (let attempt = 0; attempt < MAX_DISCONNECT_ATTEMPTS; attempt++) { + try { + await callDisconnect(runFriendlyIds); + break; + } catch { + if (attempt < MAX_DISCONNECT_ATTEMPTS - 1) { + const backoff = INITIAL_BACKOFF_MS * 2 ** attempt; + await new Promise((resolve) => setTimeout(resolve, backoff)); + } + // Final attempt failed — runs will eventually time out via heartbeat + } + } } cleanup(); From cb1cc61c4762cbdf728ef39273edc5a40b67e004 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 9 Mar 2026 11:02:38 +0000 Subject: [PATCH 4/6] fix(cli): use engineUrl for watchdog disconnect, exit on SIGINT --- packages/cli-v3/src/dev/devSupervisor.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index 79d5df8b8e7..fd9b6e38e7f 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -168,6 +168,10 @@ class DevSupervisor implements WorkerRuntime { ); await Promise.allSettled(stopPromises); + + // Must exit explicitly since registering a custom SIGINT handler + // overrides Node's default process termination behavior. + process.exit(0); }; async shutdown(): Promise { @@ -224,7 +228,7 @@ class DevSupervisor implements WorkerRuntime { env: { ...process.env, WATCHDOG_PARENT_PID: process.pid.toString(), - WATCHDOG_API_URL: this.options.client.apiURL, + WATCHDOG_API_URL: this.config?.engineUrl ?? this.options.client.apiURL, WATCHDOG_API_KEY: this.options.client.accessToken ?? "", WATCHDOG_ACTIVE_RUNS: this.activeRunsPath, WATCHDOG_PID_FILE: this.watchdogPidPath, From 09ac03ac91ba3c064d630cb5b0e2e7c19a2ed097 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 9 Mar 2026 11:24:53 +0000 Subject: [PATCH 5/6] fix(cli): unify shutdown sequence, guard PID-file fallback against reuse --- packages/cli-v3/src/dev/devSupervisor.ts | 27 ++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index fd9b6e38e7f..70a884283fa 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -161,13 +161,7 @@ class DevSupervisor implements WorkerRuntime { #handleSigterm = async () => { logger.debug("[DevSupervisor] Received SIGTERM/SIGINT, stopping all run controllers"); - // The detached watchdog handles cancelling runs on the platform. - // Here we just stop local run controllers. - const stopPromises = Array.from(this.runControllers.values()).map((controller) => - controller.stop() - ); - - await Promise.allSettled(stopPromises); + await this.shutdown(); // Must exit explicitly since registering a custom SIGINT handler // overrides Node's default process termination behavior. @@ -178,6 +172,12 @@ class DevSupervisor implements WorkerRuntime { process.off("SIGTERM", this.#handleSigterm); process.off("SIGINT", this.#handleSigterm); + // Stop all local run controllers first so active-runs.json is up-to-date + const stopPromises = Array.from(this.runControllers.values()).map((controller) => + controller.stop() + ); + await Promise.allSettled(stopPromises); + // Kill watchdog on clean shutdown — no disconnect needed since runs are stopped locally this.#killWatchdog(); @@ -247,23 +247,28 @@ class DevSupervisor implements WorkerRuntime { } #killWatchdog() { - if (this.watchdogProcess?.pid) { + const knownPid = this.watchdogProcess?.pid; + + if (knownPid) { try { - process.kill(this.watchdogProcess.pid, "SIGTERM"); + process.kill(knownPid, "SIGTERM"); } catch { // Already dead } this.watchdogProcess = undefined; } - // Also try via PID file in case the process reference is stale + // Fallback: try via PID file, but only if the PID matches our spawned watchdog + // to avoid killing an unrelated process that reused a stale PID if (this.watchdogPidPath) { try { const content = readFileSync(this.watchdogPidPath, "utf8"); const prefix = "trigger-watchdog:"; if (content.startsWith(prefix)) { const pid = parseInt(content.slice(prefix.length), 10); - if (pid) process.kill(pid, "SIGTERM"); + if (pid && (!knownPid || pid === knownPid)) { + process.kill(pid, "SIGTERM"); + } } } catch { // Already dead or no file From 245c6ade9e06b05ad8ac4ef2f17fb3f1716f4ea0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 9 Mar 2026 11:56:21 +0000 Subject: [PATCH 6/6] fix: restrict dev disconnect endpoint to DEVELOPMENT environments --- apps/webapp/app/routes/engine.v1.dev.disconnect.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/webapp/app/routes/engine.v1.dev.disconnect.ts b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts index bd3eff5c45a..0cf92a53b70 100644 --- a/apps/webapp/app/routes/engine.v1.dev.disconnect.ts +++ b/apps/webapp/app/routes/engine.v1.dev.disconnect.ts @@ -35,6 +35,12 @@ const { action } = createActionApiRoute( method: "POST", }, async ({ authentication, body }) => { + // Only allow dev environments — this endpoint uses finalizeRun which + // skips PENDING_CANCEL and immediately finalizes executing runs. + if (authentication.environment.type !== "DEVELOPMENT") { + return json({ error: "This endpoint is only available for dev environments" }, { status: 403 }); + } + const environmentId = authentication.environment.id; // Rate limit per environment