-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(supervisor): compute workload manager #3114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7cbd3aa
ccc8fe2
3175a10
56ef39f
1bccd1e
74817d7
a538735
ac3dadf
5a7b8ce
7e251d4
e4915c4
4d603ad
9466a47
c1511f9
0a6d6f1
4332743
5089bba
7ed9221
e9b5fd3
0531a23
9572c7d
5032b7f
f3e0cb8
0edc308
63424fa
80b62d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "trigger.dev": patch | ||
| --- | ||
|
|
||
| Fix `--load` flag being silently ignored on local/self-hosted builds. |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ import { | |
| } from "./resourceMonitor.js"; | ||
| import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; | ||
| import { DockerWorkloadManager } from "./workloadManager/docker.js"; | ||
| import { ComputeWorkloadManager } from "./workloadManager/compute.js"; | ||
| import { | ||
| HttpServer, | ||
| CheckpointClient, | ||
|
|
@@ -35,9 +36,11 @@ class ManagedSupervisor { | |
| private readonly metricsServer?: HttpServer; | ||
| private readonly workloadServer: WorkloadServer; | ||
| private readonly workloadManager: WorkloadManager; | ||
| private readonly computeManager?: ComputeWorkloadManager; | ||
| private readonly logger = new SimpleStructuredLogger("managed-supervisor"); | ||
| private readonly resourceMonitor: ResourceMonitor; | ||
| private readonly checkpointClient?: CheckpointClient; | ||
| private readonly isComputeMode: boolean; | ||
|
|
||
| private readonly podCleaner?: PodCleaner; | ||
| private readonly failedPodHandler?: FailedPodHandler; | ||
|
|
@@ -77,9 +80,22 @@ class ManagedSupervisor { | |
| : new DockerResourceMonitor(new Docker()) | ||
| : new NoopResourceMonitor(); | ||
|
|
||
| this.workloadManager = this.isKubernetes | ||
| ? new KubernetesWorkloadManager(workloadManagerOptions) | ||
| : new DockerWorkloadManager(workloadManagerOptions); | ||
| this.isComputeMode = !!env.COMPUTE_GATEWAY_URL; | ||
|
|
||
| if (env.COMPUTE_GATEWAY_URL) { | ||
| const computeManager = new ComputeWorkloadManager({ | ||
| ...workloadManagerOptions, | ||
| gatewayUrl: env.COMPUTE_GATEWAY_URL, | ||
| gatewayAuthToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, | ||
| gatewayTimeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS, | ||
| }); | ||
| this.computeManager = computeManager; | ||
| this.workloadManager = computeManager; | ||
| } else { | ||
| this.workloadManager = this.isKubernetes | ||
| ? new KubernetesWorkloadManager(workloadManagerOptions) | ||
| : new DockerWorkloadManager(workloadManagerOptions); | ||
| } | ||
|
|
||
| if (this.isKubernetes) { | ||
| if (env.POD_CLEANER_ENABLED) { | ||
|
|
@@ -187,7 +203,7 @@ class ManagedSupervisor { | |
| this.workloadServer.notifyRun({ run }); | ||
| }); | ||
|
|
||
| this.workerSession.on("runQueueMessage", async ({ time, message }) => { | ||
| this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { | ||
| this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); | ||
|
|
||
| if (message.completedWaitpoints.length > 0) { | ||
|
|
@@ -207,6 +223,33 @@ class ManagedSupervisor { | |
| if (checkpoint) { | ||
| this.logger.log("Restoring run", { runId: message.run.id }); | ||
|
|
||
| if (this.isComputeMode && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { | ||
| try { | ||
| // Derive runnerId unique per restore cycle (matches iceman's pattern) | ||
| const runIdShort = message.run.friendlyId.replace("run_", ""); | ||
| const checkpointSuffix = checkpoint.id.slice(-8); | ||
| const runnerId = `runner-${runIdShort}-${checkpointSuffix}`; | ||
|
|
||
| const didRestore = await this.computeManager.restore({ | ||
| snapshotId: checkpoint.location, | ||
| runnerId, | ||
| runFriendlyId: message.run.friendlyId, | ||
| snapshotFriendlyId: message.snapshot.friendlyId, | ||
| machine: message.run.machine, | ||
| }); | ||
|
|
||
| if (didRestore) { | ||
| this.logger.log("Compute restore successful", { runId: message.run.id, runnerId }); | ||
| } else { | ||
| this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); | ||
| } | ||
| } catch (error) { | ||
| this.logger.error("Failed to restore run (compute)", { error }); | ||
| } | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| if (!this.checkpointClient) { | ||
| this.logger.error("No checkpoint client", { runId: message.run.id }); | ||
| return; | ||
|
|
@@ -236,7 +279,9 @@ class ManagedSupervisor { | |
|
|
||
| this.logger.log("Scheduling run", { runId: message.run.id }); | ||
|
|
||
| const warmStartStart = performance.now(); | ||
| const didWarmStart = await this.tryWarmStart(message); | ||
| const warmStartCheckMs = Math.round(performance.now() - warmStartStart); | ||
|
|
||
| if (didWarmStart) { | ||
| this.logger.log("Warm start successful", { runId: message.run.id }); | ||
|
|
@@ -252,6 +297,9 @@ class ManagedSupervisor { | |
|
|
||
| await this.workloadManager.create({ | ||
| dequeuedAt: message.dequeuedAt, | ||
| dequeueResponseMs, | ||
| pollingIntervalMs, | ||
| warmStartCheckMs, | ||
| envId: message.environment.id, | ||
| envType: message.environment.type, | ||
| image: message.image, | ||
|
|
@@ -296,6 +344,7 @@ class ManagedSupervisor { | |
| host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, | ||
| workerClient: this.workerSession.httpClient, | ||
| checkpointClient: this.checkpointClient, | ||
| computeManager: this.computeManager, | ||
| }); | ||
|
|
||
| this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); | ||
|
|
@@ -380,6 +429,7 @@ class ManagedSupervisor { | |
|
|
||
| async stop() { | ||
| this.logger.log("Shutting down"); | ||
| await this.workloadServer.stop(); | ||
| await this.workerSession.stop(); | ||
|
Comment on lines
430
to
433
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quiesce the worker session before tearing down the workload API.
🤖 Prompt for AI Agents |
||
|
|
||
| // Optional services | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,254 @@ | ||
| import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; | ||
| import { TimerWheel } from "./timerWheel.js"; | ||
|
|
||
| describe("TimerWheel", () => { | ||
| beforeEach(() => { | ||
| vi.useFakeTimers(); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| vi.useRealTimers(); | ||
| }); | ||
|
|
||
| it("dispatches item after delay", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "snapshot-data"); | ||
|
|
||
| // Not yet | ||
| vi.advanceTimersByTime(2900); | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| // After delay | ||
| vi.advanceTimersByTime(200); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("cancels item before it fires", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| expect(wheel.cancel("run-1")).toBe(true); | ||
|
|
||
| vi.advanceTimersByTime(5000); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("cancel returns false for unknown key", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
| expect(wheel.cancel("nonexistent")).toBe(false); | ||
| }); | ||
|
|
||
| it("deduplicates: resubmitting same key replaces the entry", () => { | ||
| const dispatched: { key: string; data: string }[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push({ key: item.key, data: item.data }), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "old-data"); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-1", "new-data"); | ||
|
|
||
| // Original would have fired at t=3000, but was replaced | ||
| // New one fires at t=1000+3000=4000 | ||
| vi.advanceTimersByTime(2100); | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("handles many concurrent items", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| for (let i = 0; i < 1000; i++) { | ||
| wheel.submit(`run-${i}`, `data-${i}`); | ||
| } | ||
| expect(wheel.size).toBe(1000); | ||
|
|
||
| vi.advanceTimersByTime(3100); | ||
| expect(dispatched.length).toBe(1000); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("handles items submitted at different times", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("run-1", "data"); | ||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-2", "data"); | ||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-3", "data"); | ||
|
|
||
| // t=2000: nothing yet | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| // t=3100: run-1 fires | ||
| vi.advanceTimersByTime(1100); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| // t=4100: run-2 fires | ||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual(["run-1", "run-2"]); | ||
|
|
||
| // t=5100: run-3 fires | ||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual(["run-1", "run-2", "run-3"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("setDelay changes delay for new items only", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("run-1", "data"); // 3s delay | ||
|
|
||
| vi.advanceTimersByTime(500); | ||
| wheel.setDelay(1000); | ||
| wheel.submit("run-2", "data"); // 1s delay | ||
|
|
||
| // t=1500: run-2 should have fired (submitted at t=500 with 1s delay) | ||
| vi.advanceTimersByTime(1100); | ||
| expect(dispatched).toEqual(["run-2"]); | ||
|
|
||
| // t=3100: run-1 fires at its original 3s delay | ||
| vi.advanceTimersByTime(1500); | ||
| expect(dispatched).toEqual(["run-2", "run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("stop returns unprocessed items", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data-1"); | ||
| wheel.submit("run-2", "data-2"); | ||
| wheel.submit("run-3", "data-3"); | ||
|
|
||
| const remaining = wheel.stop(); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
| expect(remaining.length).toBe(3); | ||
| expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]); | ||
| expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1"); | ||
| }); | ||
|
|
||
| it("after stop, new submissions are silently dropped", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.stop(); | ||
|
|
||
| wheel.submit("run-late", "data"); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
| }); | ||
|
|
||
| it("tracks size correctly through submit/cancel/dispatch", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("a", "data"); | ||
| wheel.submit("b", "data"); | ||
| expect(wheel.size).toBe(2); | ||
|
|
||
| wheel.cancel("a"); | ||
| expect(wheel.size).toBe(1); | ||
|
|
||
| vi.advanceTimersByTime(3100); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("clamps delay to valid range", () => { | ||
| const dispatched: string[] = []; | ||
|
|
||
| // Very small delay (should be at least 1 tick = 100ms) | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 0, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| vi.advanceTimersByTime(200); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("multiple cancel calls are safe", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| expect(wheel.cancel("run-1")).toBe(true); | ||
| expect(wheel.cancel("run-1")).toBe(false); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
| }); |
Uh oh!
There was an error while loading. Please reload this page.