diff --git a/.changeset/fix-local-build-load.md b/.changeset/fix-local-build-load.md new file mode 100644 index 00000000000..13f91da9d6a --- /dev/null +++ b/.changeset/fix-local-build-load.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix `--load` flag being silently ignored on local/self-hosted builds. diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index faf34bcd025..73d703da2fe 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,142 +3,170 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z.object({ - // This will come from `spec.nodeName` in k8s - TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), - TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), - - // Required settings - TRIGGER_API_URL: z.string().url(), - TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file - MANAGED_WORKER_SECRET: z.string(), - OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners - - // Workload API settings (coordinator mode) - the workload API is what the run controller connects to - TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), - TRIGGER_WORKLOAD_API_PROTOCOL: z - .string() - .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) - .default("http"), - TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default - TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), - TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller - - // Runner settings - RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) - RUNNER_PRETTY_LOGS: BoolEnv.default(false), - - // Dequeue settings (provider mode) - TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), - TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), - TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), - TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), - TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds - TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds - TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) - TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) - TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) - TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) - - // Optional services - TRIGGER_WARM_START_URL: z.string().optional(), - TRIGGER_CHECKPOINT_URL: z.string().optional(), - TRIGGER_METADATA_URL: z.string().optional(), - - // Used by the resource monitor - RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), - RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), - - // Docker settings - DOCKER_API_VERSION: z.string().optional(), - DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 - DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), - DOCKER_REGISTRY_USERNAME: z.string().optional(), - DOCKER_REGISTRY_PASSWORD: z.string().optional(), - DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 - DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), - DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), - /** - * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. - * Any other value is taken as a custom network's name to which all runners should connect to. - * - * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. - * - * **WARNING**: Specifying multiple networks will slightly increase startup times. - * - * @default "host" - */ - DOCKER_RUNNER_NETWORKS: z.string().default("host"), - - // Kubernetes settings - KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), - KUBERNETES_NAMESPACE: z.string().default("default"), - KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), - KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv - KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), - KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), - KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), - KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), - KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit - KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), - KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit - - // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO - KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO - KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB - KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods - KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool= - - // Project affinity settings - pods from the same project prefer the same node - KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), - KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"), - - // Placement tags settings - PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), - PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), - - // Metrics - METRICS_ENABLED: BoolEnv.default(true), - METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), - METRICS_HOST: z.string().default("127.0.0.1"), - METRICS_PORT: z.coerce.number().int().default(9090), - - // Pod cleaner - POD_CLEANER_ENABLED: BoolEnv.default(true), - POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), - POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), - - // Failed pod handler - FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), - FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), - - // Debug - DEBUG: BoolEnv.default(false), - SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), -}); +const Env = z + .object({ + // This will come from `spec.nodeName` in k8s + TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + + // Required settings + TRIGGER_API_URL: z.string().url(), + TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file + MANAGED_WORKER_SECRET: z.string(), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners + + // Workload API settings (coordinator mode) - the workload API is what the run controller connects to + TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), + TRIGGER_WORKLOAD_API_PROTOCOL: z + .string() + .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) + .default("http"), + TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), + TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on + TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + + // Runner settings + RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + RUNNER_PRETTY_LOGS: BoolEnv.default(false), + + // Dequeue settings (provider mode) + TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), + TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), + TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds + TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds + TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) + TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) + TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) + TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + + // Optional services + TRIGGER_WARM_START_URL: z.string().optional(), + TRIGGER_CHECKPOINT_URL: z.string().optional(), + TRIGGER_METADATA_URL: z.string().optional(), + + // Used by the resource monitor + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + + // Docker settings + DOCKER_API_VERSION: z.string().optional(), + DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 + DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), + DOCKER_REGISTRY_USERNAME: z.string().optional(), + DOCKER_REGISTRY_PASSWORD: z.string().optional(), + DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 + DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), + DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + DOCKER_RUNNER_NETWORKS: z.string().default("host"), + + // Compute settings + COMPUTE_GATEWAY_URL: z.string().url().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_GATEWAY_TIMEOUT_MS: z.coerce.number().int().default(30_000), + COMPUTE_SNAPSHOTS_ENABLED: BoolEnv.default(false), + COMPUTE_SNAPSHOT_DELAY_MS: z.coerce.number().int().min(0).max(60_000).default(5_000), + + // Kubernetes settings + KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), + KUBERNETES_NAMESPACE: z.string().default("default"), + KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), + KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv + KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), + KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), + KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), + KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), + KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit + KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), + KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit + + // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO + KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO + KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB + KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods + KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool= + + // Project affinity settings - pods from the same project prefer the same node + KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), + KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z + .string() + .trim() + .min(1) + .default("kubernetes.io/hostname"), + + // Placement tags settings + PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), + PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), + + // Metrics + METRICS_ENABLED: BoolEnv.default(true), + METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + METRICS_HOST: z.string().default("127.0.0.1"), + METRICS_PORT: z.coerce.number().int().default(9090), + + // Pod cleaner + POD_CLEANER_ENABLED: BoolEnv.default(true), + POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), + POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), + + // Failed pod handler + FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), + FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Debug + DEBUG: BoolEnv.default(false), + SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + }) + .superRefine((data, ctx) => { + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_METADATA_URL is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_METADATA_URL"], + }); + } + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_WORKLOAD_API_DOMAIN) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_WORKLOAD_API_DOMAIN is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_WORKLOAD_API_DOMAIN"], + }); + } + }); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 0e274b30390..71fd4b83796 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -14,6 +14,7 @@ import { } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; import { DockerWorkloadManager } from "./workloadManager/docker.js"; +import { ComputeWorkloadManager } from "./workloadManager/compute.js"; import { HttpServer, CheckpointClient, @@ -35,9 +36,11 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; + private readonly computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly isComputeMode: boolean; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; @@ -77,9 +80,22 @@ class ManagedSupervisor { : new DockerResourceMonitor(new Docker()) : new NoopResourceMonitor(); - this.workloadManager = this.isKubernetes - ? new KubernetesWorkloadManager(workloadManagerOptions) - : new DockerWorkloadManager(workloadManagerOptions); + this.isComputeMode = !!env.COMPUTE_GATEWAY_URL; + + if (env.COMPUTE_GATEWAY_URL) { + const computeManager = new ComputeWorkloadManager({ + ...workloadManagerOptions, + gatewayUrl: env.COMPUTE_GATEWAY_URL, + gatewayAuthToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + gatewayTimeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS, + }); + this.computeManager = computeManager; + this.workloadManager = computeManager; + } else { + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + } if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { @@ -187,7 +203,7 @@ class ManagedSupervisor { this.workloadServer.notifyRun({ run }); }); - this.workerSession.on("runQueueMessage", async ({ time, message }) => { + this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); if (message.completedWaitpoints.length > 0) { @@ -207,6 +223,33 @@ class ManagedSupervisor { if (checkpoint) { this.logger.log("Restoring run", { runId: message.run.id }); + if (this.isComputeMode && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + try { + // Derive runnerId unique per restore cycle (matches iceman's pattern) + const runIdShort = message.run.friendlyId.replace("run_", ""); + const checkpointSuffix = checkpoint.id.slice(-8); + const runnerId = `runner-${runIdShort}-${checkpointSuffix}`; + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + }); + + if (didRestore) { + this.logger.log("Compute restore successful", { runId: message.run.id, runnerId }); + } else { + this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + } + } catch (error) { + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } + if (!this.checkpointClient) { this.logger.error("No checkpoint client", { runId: message.run.id }); return; @@ -236,7 +279,9 @@ class ManagedSupervisor { this.logger.log("Scheduling run", { runId: message.run.id }); + const warmStartStart = performance.now(); const didWarmStart = await this.tryWarmStart(message); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); if (didWarmStart) { this.logger.log("Warm start successful", { runId: message.run.id }); @@ -252,6 +297,9 @@ class ManagedSupervisor { await this.workloadManager.create({ dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, envId: message.environment.id, envType: message.environment.type, image: message.image, @@ -296,6 +344,7 @@ class ManagedSupervisor { host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, + computeManager: this.computeManager, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); @@ -380,6 +429,7 @@ class ManagedSupervisor { async stop() { this.logger.log("Shutting down"); + await this.workloadServer.stop(); await this.workerSession.stop(); // Optional services diff --git a/apps/supervisor/src/services/timerWheel.test.ts b/apps/supervisor/src/services/timerWheel.test.ts new file mode 100644 index 00000000000..3f6bb9aa19b --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.test.ts @@ -0,0 +1,254 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TimerWheel } from "./timerWheel.js"; + +describe("TimerWheel", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("dispatches item after delay", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "snapshot-data"); + + // Not yet + vi.advanceTimersByTime(2900); + expect(dispatched).toEqual([]); + + // After delay + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("cancels item before it fires", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(1000); + expect(wheel.cancel("run-1")).toBe(true); + + vi.advanceTimersByTime(5000); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("cancel returns false for unknown key", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + expect(wheel.cancel("nonexistent")).toBe(false); + }); + + it("deduplicates: resubmitting same key replaces the entry", () => { + const dispatched: { key: string; data: string }[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push({ key: item.key, data: item.data }), + }); + + wheel.start(); + wheel.submit("run-1", "old-data"); + + vi.advanceTimersByTime(1000); + wheel.submit("run-1", "new-data"); + + // Original would have fired at t=3000, but was replaced + // New one fires at t=1000+3000=4000 + vi.advanceTimersByTime(2100); + expect(dispatched).toEqual([]); + + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]); + + wheel.stop(); + }); + + it("handles many concurrent items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + for (let i = 0; i < 1000; i++) { + wheel.submit(`run-${i}`, `data-${i}`); + } + expect(wheel.size).toBe(1000); + + vi.advanceTimersByTime(3100); + expect(dispatched.length).toBe(1000); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("handles items submitted at different times", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-2", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-3", "data"); + + // t=2000: nothing yet + expect(dispatched).toEqual([]); + + // t=3100: run-1 fires + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-1"]); + + // t=4100: run-2 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2"]); + + // t=5100: run-3 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2", "run-3"]); + + wheel.stop(); + }); + + it("setDelay changes delay for new items only", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); // 3s delay + + vi.advanceTimersByTime(500); + wheel.setDelay(1000); + wheel.submit("run-2", "data"); // 1s delay + + // t=1500: run-2 should have fired (submitted at t=500 with 1s delay) + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-2"]); + + // t=3100: run-1 fires at its original 3s delay + vi.advanceTimersByTime(1500); + expect(dispatched).toEqual(["run-2", "run-1"]); + + wheel.stop(); + }); + + it("stop returns unprocessed items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data-1"); + wheel.submit("run-2", "data-2"); + wheel.submit("run-3", "data-3"); + + const remaining = wheel.stop(); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + expect(remaining.length).toBe(3); + expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]); + expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1"); + }); + + it("after stop, new submissions are silently dropped", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.stop(); + + wheel.submit("run-late", "data"); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + }); + + it("tracks size correctly through submit/cancel/dispatch", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + + wheel.submit("a", "data"); + wheel.submit("b", "data"); + expect(wheel.size).toBe(2); + + wheel.cancel("a"); + expect(wheel.size).toBe(1); + + vi.advanceTimersByTime(3100); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("clamps delay to valid range", () => { + const dispatched: string[] = []; + + // Very small delay (should be at least 1 tick = 100ms) + const wheel = new TimerWheel({ + delayMs: 0, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("multiple cancel calls are safe", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + expect(wheel.cancel("run-1")).toBe(true); + expect(wheel.cancel("run-1")).toBe(false); + + wheel.stop(); + }); +}); diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts new file mode 100644 index 00000000000..4a95e216b25 --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.ts @@ -0,0 +1,160 @@ +/** + * TimerWheel implements a hashed timer wheel for efficiently managing large numbers + * of delayed operations with O(1) submit, cancel, and per-item dispatch. + * + * Used by the supervisor to delay snapshot requests so that short-lived waitpoints + * (e.g. triggerAndWait that resolves in <3s) skip the snapshot entirely. + * + * The wheel is a ring buffer of slots. A single setInterval advances a cursor. + * When the cursor reaches a slot, all items in that slot are dispatched. + * + * Fixed capacity: 600 slots at 100ms tick = 60s max delay. + */ + +const TICK_MS = 100; +const NUM_SLOTS = 600; // 60s max delay at 100ms tick + +export type TimerWheelItem = { + key: string; + data: T; +}; + +export type TimerWheelOptions = { + /** Called when an item's delay expires. */ + onExpire: (item: TimerWheelItem) => void; + /** Delay in milliseconds before items fire. Clamped to [100, 60000]. */ + delayMs: number; +}; + +type Entry = { + key: string; + data: T; + slotIndex: number; +}; + +export class TimerWheel { + private slots: Set[]; + private entries: Map>; + private cursor: number; + private intervalId: ReturnType | null; + private onExpire: (item: TimerWheelItem) => void; + private delaySlots: number; + + constructor(opts: TimerWheelOptions) { + this.slots = Array.from({ length: NUM_SLOTS }, () => new Set()); + this.entries = new Map(); + this.cursor = 0; + this.intervalId = null; + this.onExpire = opts.onExpire; + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.round(opts.delayMs / TICK_MS))); + } + + /** Start the timer wheel. Must be called before submitting items. */ + start(): void { + if (this.intervalId) return; + this.intervalId = setInterval(() => this.tick(), TICK_MS); + // Don't hold the process open just for the timer wheel + if (this.intervalId && typeof this.intervalId === "object" && "unref" in this.intervalId) { + this.intervalId.unref(); + } + } + + /** + * Stop the timer wheel and return all unprocessed items. + * The wheel keeps running normally during graceful shutdown - call stop() + * only when you're ready to tear down. Caller decides what to do with leftovers. + */ + stop(): TimerWheelItem[] { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + } + + const remaining: TimerWheelItem[] = []; + for (const [key, entry] of this.entries) { + remaining.push({ key, data: entry.data }); + } + + for (const slot of this.slots) { + slot.clear(); + } + this.entries.clear(); + + return remaining; + } + + /** + * Update the delay for future submissions. Already-queued items keep their original timing. + * Clamped to [TICK_MS, 60000ms]. + */ + setDelay(delayMs: number): void { + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.round(delayMs / TICK_MS))); + } + + /** + * Submit an item to be dispatched after the configured delay. + * If an item with the same key already exists, it is replaced (dedup). + * No-op if the wheel is stopped. + */ + submit(key: string, data: T): void { + if (!this.intervalId) return; + + // Dedup: remove existing entry for this key + this.cancel(key); + + const slotIndex = (this.cursor + this.delaySlots) % NUM_SLOTS; + const entry: Entry = { key, data, slotIndex }; + + this.entries.set(key, entry); + this.slot(slotIndex).add(key); + } + + /** + * Cancel a pending item. Returns true if the item was found and removed. + */ + cancel(key: string): boolean { + const entry = this.entries.get(key); + if (!entry) return false; + + this.slot(entry.slotIndex).delete(key); + this.entries.delete(key); + return true; + } + + /** Number of pending items in the wheel. */ + get size(): number { + return this.entries.size; + } + + /** Whether the wheel is running. */ + get running(): boolean { + return this.intervalId !== null; + } + + /** Get a slot by index. The array is fully initialized so this always returns a Set. */ + private slot(index: number): Set { + const s = this.slots[index]; + if (!s) throw new Error(`TimerWheel: invalid slot index ${index}`); + return s; + } + + /** Advance the cursor and dispatch all items in the current slot. */ + private tick(): void { + this.cursor = (this.cursor + 1) % NUM_SLOTS; + const slot = this.slot(this.cursor); + + if (slot.size === 0) return; + + // Collect items to dispatch (copy keys since we mutate during iteration) + const keys = [...slot]; + slot.clear(); + + for (const key of keys) { + const entry = this.entries.get(key); + if (!entry) continue; + + this.entries.delete(key); + this.onExpire({ key, data: entry.data }); + } + } +} diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts new file mode 100644 index 00000000000..ea7be55d1a9 --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -0,0 +1,311 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { + type WorkloadManager, + type WorkloadManagerCreateOptions, + type WorkloadManagerOptions, +} from "./types.js"; +import { env } from "../env.js"; +import { getRunnerId } from "../util.js"; +import { tryCatch } from "@trigger.dev/core"; + +type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { + gatewayUrl: string; + gatewayAuthToken?: string; + gatewayTimeoutMs: number; +}; + +export class ComputeWorkloadManager implements WorkloadManager { + private readonly logger = new SimpleStructuredLogger("compute-workload-manager"); + + constructor(private opts: ComputeWorkloadManagerOptions) { + if (opts.workloadApiDomain) { + this.logger.warn("⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } + } + + async create(opts: WorkloadManagerCreateOptions) { + const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); + + const envVars: Record = { + OTEL_EXPORTER_OTLP_ENDPOINT: env.OTEL_EXPORTER_OTLP_ENDPOINT, + TRIGGER_DEQUEUED_AT_MS: String(opts.dequeuedAt.getTime()), + TRIGGER_POD_SCHEDULED_AT_MS: String(Date.now()), + TRIGGER_ENV_ID: opts.envId, + TRIGGER_DEPLOYMENT_ID: opts.deploymentFriendlyId, + TRIGGER_DEPLOYMENT_VERSION: opts.deploymentVersion, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + TRIGGER_RUNNER_ID: runnerId, + TRIGGER_MACHINE_CPU: String(opts.machine.cpu), + TRIGGER_MACHINE_MEMORY: String(opts.machine.memory), + PRETTY_LOGS: String(env.RUNNER_PRETTY_LOGS), + }; + + if (this.opts.warmStartUrl) { + envVars.TRIGGER_WARM_START_URL = this.opts.warmStartUrl; + } + + if (env.COMPUTE_SNAPSHOTS_ENABLED && this.opts.metadataUrl) { + envVars.TRIGGER_METADATA_URL = this.opts.metadataUrl; + } + + if (this.opts.heartbeatIntervalSeconds) { + envVars.TRIGGER_HEARTBEAT_INTERVAL_SECONDS = String(this.opts.heartbeatIntervalSeconds); + } + + if (this.opts.snapshotPollIntervalSeconds) { + envVars.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS = String( + this.opts.snapshotPollIntervalSeconds + ); + } + + if (this.opts.additionalEnvVars) { + Object.assign(envVars, this.opts.additionalEnvVars); + } + + const headers: Record = { + "Content-Type": "application/json", + }; + + if (this.opts.gatewayAuthToken) { + headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`; + } + + // Strip image digest — resolve by tag, not digest + const imageRef = opts.image.split("@")[0]!; + + const url = `${this.opts.gatewayUrl}/api/instances`; + + // Wide event: single canonical log line emitted in finally + const event: Record = { + // High-cardinality identifiers + runId: opts.runFriendlyId, + runnerId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + // Environment + instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + // Supervisor timing + dequeueResponseMs: opts.dequeueResponseMs, + pollingIntervalMs: opts.pollingIntervalMs, + warmStartCheckMs: opts.warmStartCheckMs, + // Request + image: imageRef, + url, + }; + + const startMs = performance.now(); + + try { + const [fetchError, response] = await tryCatch( + fetch(url, { + method: "POST", + headers, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify({ + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + }), + }) + ); + + if (fetchError) { + event.error = fetchError instanceof Error ? fetchError.message : String(fetchError); + event.errorType = + fetchError instanceof DOMException && fetchError.name === "TimeoutError" + ? "timeout" + : "fetch"; + return; + } + + event.status = response.status; + + if (!response.ok) { + const [bodyError, body] = await tryCatch(response.text()); + event.responseBody = bodyError ? undefined : body; + return; + } + + const [parseError, data] = await tryCatch(response.json()); + + if (parseError) { + event.error = parseError instanceof Error ? parseError.message : String(parseError); + event.errorType = "parse"; + return; + } + + event.instanceId = data.id; + event.ok = true; + } finally { + event.durationMs = Math.round(performance.now() - startMs); + event.ok ??= false; + this.logger.info("create instance", event); + } + } + + private get authHeaders(): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + if (this.opts.gatewayAuthToken) { + headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`; + } + return headers; + } + + async snapshot(opts: { + runnerId: string; + callbackUrl: string; + metadata: Record; + }): Promise { + const url = `${this.opts.gatewayUrl}/api/instances/${opts.runnerId}/snapshot`; + + const [error, response] = await tryCatch( + fetch(url, { + method: "POST", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify({ + callback: { + url: opts.callbackUrl, + metadata: opts.metadata, + }, + }), + }) + ); + + if (error) { + this.logger.error("snapshot request failed", { + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (response.status !== 202) { + this.logger.error("snapshot request rejected", { + runnerId: opts.runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("snapshot request accepted", { runnerId: opts.runnerId }); + return true; + } + + async deleteInstance(runnerId: string): Promise { + const url = `${this.opts.gatewayUrl}/api/instances/${runnerId}`; + + const [error, response] = await tryCatch( + fetch(url, { + method: "DELETE", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + }) + ); + + if (error) { + this.logger.error("delete instance failed", { + runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (!response.ok) { + this.logger.error("delete instance rejected", { + runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("delete instance success", { runnerId }); + return true; + } + + async restore(opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; + machine: { cpu: number; memory: number }; + }): Promise { + const url = `${this.opts.gatewayUrl}/api/snapshots/${opts.snapshotId}/restore`; + + const metadata: Record = { + TRIGGER_RUNNER_ID: opts.runnerId, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + }; + + const body = { + name: opts.runnerId, + metadata, + cpu: opts.machine.cpu, + memory_mb: opts.machine.memory * 1024, + }; + + this.logger.debug("restore request body", { url, body }); + + const [error, response] = await tryCatch( + fetch(url, { + method: "POST", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify(body), + }) + ); + + if (error) { + this.logger.error("restore request failed", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (!response.ok) { + this.logger.error("restore request rejected", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("restore request success", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + }); + return true; + } +} diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 90b61957795..82c7ea7b4c0 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -24,6 +24,10 @@ export interface WorkloadManagerCreateOptions { nextAttemptNumber?: number; dequeuedAt: Date; placementTags?: PlacementTag[]; + // Timing context (populated by supervisor handler, included in wide event) + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs?: number; // identifiers envId: string; envType: EnvironmentType; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 35d53d36099..fcb7297d340 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -24,6 +24,8 @@ import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOn import { type IncomingMessage } from "node:http"; import { register } from "../metrics.js"; import { env } from "../env.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import { TimerWheel } from "../services/timerWheel.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -53,15 +55,31 @@ type WorkloadServerEvents = { ]; }; +const ComputeSnapshotCallbackBody = z.object({ + snapshot_id: z.string(), + instance_id: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), +}); + +type DelayedSnapshot = { + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; +}; + type WorkloadServerOptions = { port: number; host?: string; workerClient: SupervisorHttpClient; checkpointClient?: CheckpointClient; + computeManager?: ComputeWorkloadManager; }; export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("workload-server"); @@ -84,6 +102,7 @@ export class WorkloadServer extends EventEmitter { >(); private readonly workerClient: SupervisorHttpClient; + private readonly snapshotDelayWheel?: TimerWheel; constructor(opts: WorkloadServerOptions) { super(); @@ -93,6 +112,15 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + this.computeManager = opts.computeManager; + + if (this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + this.snapshotDelayWheel = new TimerWheel({ + delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS, + onExpire: (item) => this.dispatchComputeSnapshot(item.data), + }); + this.snapshotDelayWheel.start(); + } this.httpServer = this.createHttpServer({ host, port }); this.websocketServer = this.createWebsocketServer(); @@ -231,11 +259,19 @@ export class WorkloadServer extends EventEmitter { handler: async ({ reply, params, req }) => { this.logger.debug("Suspend request", { params, headers: req.headers }); - if (!this.checkpointClient) { + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + headers: req.headers, + }); reply.json( { ok: false, - error: "Checkpoints disabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -243,19 +279,38 @@ export class WorkloadServer extends EventEmitter { return; } - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); + if (this.snapshotDelayWheel && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + if (!env.TRIGGER_WORKLOAD_API_DOMAIN) { + this.logger.error( + "TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create snapshot callback URL" + ); + reply.json({ error: "Snapshot callbacks not configured" }, false, 500); + return; + } - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, + // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. + // If the run continues before the delay expires, the snapshot is cancelled. + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); + + this.snapshotDelayWheel.submit(params.runFriendlyId, { + runnerId, + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, }); + + this.logger.debug("Snapshot delayed", { + runId: params.runFriendlyId, + delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS, + }); + + return; + } + + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -298,6 +353,11 @@ export class WorkloadServer extends EventEmitter { handler: async ({ req, reply, params }) => { this.logger.debug("Run continuation request", { params }); + // Cancel any pending delayed snapshot for this run + if (this.snapshotDelayWheel?.cancel(params.runFriendlyId)) { + this.logger.debug("Cancelled delayed snapshot", { runId: params.runFriendlyId }); + } + const continuationResult = await this.workerClient.continueRunExecution( params.runFriendlyId, params.snapshotFriendlyId, @@ -394,6 +454,76 @@ export class WorkloadServer extends EventEmitter { }); } + // Compute snapshot callback endpoint + httpServer.route("/api/v1/compute/snapshot-complete", "POST", { + bodySchema: ComputeSnapshotCallbackBody, + handler: async ({ reply, body }) => { + this.logger.info("Compute snapshot callback", { + snapshotId: body.snapshot_id, + instanceId: body.instance_id, + status: body.status, + error: body.error, + metadata: body.metadata, + }); + + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + if (!runId || !snapshotFriendlyId) { + this.logger.error("Compute snapshot callback missing metadata", { body }); + reply.empty(400); + return; + } + + if (body.status === "completed") { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: true, + checkpoint: { + type: "KUBERNETES", + location: body.snapshot_id, + }, + }, + }); + + if (result.success) { + this.logger.info("Suspend completion submitted", { + runId, + instanceId: body.instance_id, + snapshotId: body.snapshot_id, + }); + } else { + this.logger.error("Failed to submit suspend completion", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } else { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: false, + error: body.error ?? "Snapshot failed", + }, + }); + + if (!result.success) { + this.logger.error("Failed to submit suspend failure", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } + + reply.empty(200); + }, + }); + return httpServer; } @@ -588,11 +718,46 @@ export class WorkloadServer extends EventEmitter { } } + /** + * Dispatch a compute snapshot request to the gateway. Called by the timer wheel + * when the delay expires, or immediately during drain. + */ + private async dispatchComputeSnapshot(snapshot: DelayedSnapshot): Promise { + if (!this.computeManager) return; + + const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`; + + const result = await this.computeManager.snapshot({ + runnerId: snapshot.runnerId, + callbackUrl, + metadata: { + runId: snapshot.runFriendlyId, + snapshotFriendlyId: snapshot.snapshotFriendlyId, + }, + }); + + if (!result) { + this.logger.error("Failed to request compute snapshot", { + runId: snapshot.runFriendlyId, + runnerId: snapshot.runnerId, + }); + } + } + async start() { await this.httpServer.start(); } async stop() { + const remaining = this.snapshotDelayWheel?.stop() ?? []; + if (remaining.length > 0) { + this.logger.info("Snapshot delay wheel stopped, dropped pending snapshots", { + count: remaining.length, + }); + this.logger.debug("Dropped snapshot details", { + runs: remaining.map((item) => item.key), + }); + } await this.httpServer.stop(); } } diff --git a/packages/cli-v3/src/deploy/buildImage.ts b/packages/cli-v3/src/deploy/buildImage.ts index 2225d7db056..31a2b658545 100644 --- a/packages/cli-v3/src/deploy/buildImage.ts +++ b/packages/cli-v3/src/deploy/buildImage.ts @@ -205,6 +205,7 @@ async function remoteBuildImage(options: DepotBuildImageOptions): Promise { + onDequeue: async (messages, timing) => { // Always update queue length, default to 0 for empty dequeues or missing value this.updateQueueLength(messages[0]?.workerQueueLength ?? 0); // Forward to the original handler - await this.consumerOptions.onDequeue(messages); + await this.consumerOptions.onDequeue(messages, timing); }, }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/events.ts b/packages/core/src/v3/runEngineWorker/supervisor/events.ts index a51c504a3e6..df4a93686a9 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/events.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/events.ts @@ -6,6 +6,8 @@ export type WorkerEvents = { { time: Date; message: DequeuedMessage; + dequeueResponseMs?: number; + pollingIntervalMs?: number; }, ]; requestRunAttemptStart: [ diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 4379eb54f37..76faee40809 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -15,7 +15,7 @@ export type RunQueueConsumerOptions = { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; - onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; }; export class RunQueueConsumer implements QueueConsumer { @@ -23,13 +23,14 @@ export class RunQueueConsumer implements QueueConsumer { private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; private readonly maxRunCount?: number; - private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; private readonly logger = new SimpleStructuredLogger("queue-consumer"); private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; + private lastScheduledIntervalMs: number; constructor(opts: RunQueueConsumerOptions) { this.isEnabled = false; @@ -38,6 +39,7 @@ export class RunQueueConsumer implements QueueConsumer { this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; this.maxRunCount = opts.maxRunCount; + this.lastScheduledIntervalMs = opts.idleIntervalMs; this.onDequeue = opts.onDequeue; this.client = opts.client; } @@ -111,16 +113,18 @@ export class RunQueueConsumer implements QueueConsumer { let nextIntervalMs = this.idleIntervalMs; try { + const dequeueStart = performance.now(); const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, maxRunCount: this.maxRunCount, }); + const dequeueResponseMs = Math.round(performance.now() - dequeueStart); if (!response.success) { this.logger.error("Failed to dequeue", { error: response.error }); } else { try { - await this.onDequeue(response.data); + await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs }); if (response.data.length > 0) { nextIntervalMs = this.intervalMs; @@ -141,6 +145,7 @@ export class RunQueueConsumer implements QueueConsumer { this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); } + this.lastScheduledIntervalMs = delayMs; setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index e5a783b8d41..b2d344fb3dc 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -80,13 +80,15 @@ export class SupervisorSession extends EventEmitter { }); } - private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { + private async onDequeue(messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }): Promise { this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { this.emit("runQueueMessage", { time: new Date(), message, + dequeueResponseMs: timing?.dequeueResponseMs, + pollingIntervalMs: timing?.pollingIntervalMs, }); } }