Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/agent-os/package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/agent-os/src/sandbox/client.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 22 additions & 22 deletions examples/agent-os/src/sandbox/server.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import { agentOs } from "rivetkit/agent-os";

export const agentOsTestActor = agentOs({ options: { software: [common] } });

// Same actor using the per-instance createOptions factory path. The factory
// returns identical options to the static fixture above so the driver tests
// can verify both paths produce a working VM.
export const agentOsCreateOptionsTestActor = agentOs({
createOptions: async () => ({ software: [common] }),
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import {
accessControlActor,
accessControlNoQueuesActor,
} from "./access-control";
import { agentOsTestActor } from "./agent-os";

import { inputActor } from "./action-inputs";
import {
defaultTimeoutActor,
Expand All @@ -20,8 +18,8 @@ import {
import { dbActorDrizzle } from "./actor-db-drizzle";
import { dbActorRaw } from "./actor-db-raw";
import { onStateChangeActor } from "./actor-onstatechange";
import { agentOsCreateOptionsTestActor, agentOsTestActor } from "./agent-os";
import { connErrorSerializationActor } from "./conn-error-serialization";
import { dbPragmaMigrationActor } from "./db-pragma-migration";
import { counterWithParams } from "./conn-params";
import { connStateActor } from "./conn-state";
// Import actors from individual files
Expand All @@ -33,13 +31,11 @@ import {
dbLifecycleFailing,
dbLifecycleObserver,
} from "./db-lifecycle";
import { dbPragmaMigrationActor } from "./db-pragma-migration";
import { destroyActor, destroyObserver } from "./destroy";
import { customTimeoutActor, errorHandlingActor } from "./error-handling";
import { fileSystemHibernationCleanupActor } from "./file-system-hibernation-cleanup";
import {
hibernationActor,
hibernationSleepWindowActor,
} from "./hibernation";
import { hibernationActor, hibernationSleepWindowActor } from "./hibernation";
import { inlineClientActor } from "./inline-client";
import { kvActor } from "./kv";
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
Expand Down Expand Up @@ -76,41 +72,41 @@ import {
sleep,
sleepRawWsAddEventListenerClose,
sleepRawWsAddEventListenerMessage,
sleepRawWsDelayedSendOnSleep,
sleepRawWsOnClose,
sleepRawWsOnMessage,
sleepRawWsSendOnSleep,
sleepWithLongRpc,
sleepWithNoSleepOption,
sleepWithPreventSleep,
sleepWithRawHttp,
sleepWithRawWebSocket,
sleepWithWaitUntilMessage,
sleepRawWsOnClose,
sleepRawWsOnMessage,
sleepRawWsSendOnSleep,
sleepRawWsDelayedSendOnSleep,
sleepWithWaitUntilInOnWake,
sleepWithWaitUntilMessage,
} from "./sleep";
import {
sleepWithDb,
sleepWithSlowScheduledDb,
sleepWithDbConn,
sleepWithDbAction,
sleepWithRawWsCloseDb,
sleepWithRawWsCloseDbListener,
sleepWsMessageExceedsGrace,
sleepWsConcurrentDbExceedsGrace,
sleepWaitUntil,
sleepNestedWaitUntil,
sleepEnqueue,
sleepScheduleAfter,
sleepNestedWaitUntil,
sleepOnSleepThrows,
sleepScheduleAfter,
sleepWaitUntil,
sleepWaitUntilRejects,
sleepWaitUntilState,
sleepWithDb,
sleepWithDbAction,
sleepWithDbConn,
sleepWithRawWs,
sleepWithRawWsCloseDb,
sleepWithRawWsCloseDbListener,
sleepWithSlowScheduledDb,
sleepWsActiveDbExceedsGrace,
sleepWsConcurrentDbExceedsGrace,
sleepWsMessageExceedsGrace,
sleepWsRawDbAfterClose,
} from "./sleep-db";
import { lifecycleObserver, startStopRaceActor } from "./start-stop-race";
import { statelessActor } from "./stateless";
import { stateZodCoercionActor } from "./state-zod-coercion";
import { statelessActor } from "./stateless";
import {
driverCtxActor,
dynamicVarActor,
Expand All @@ -131,8 +127,8 @@ import {
workflowNestedLoopActor,
workflowNestedRaceActor,
workflowQueueActor,
workflowRunningStepActor,
workflowReplayActor,
workflowRunningStepActor,
workflowSleepActor,
workflowSpawnChildActor,
workflowSpawnParentActor,
Expand Down Expand Up @@ -304,5 +300,6 @@ export const registry = setup({
stateZodCoercionActor,
// From agent-os.ts
agentOsTestActor,
agentOsCreateOptionsTestActor,
},
});
101 changes: 76 additions & 25 deletions rivetkit-typescript/packages/rivetkit/src/agent-os/actor/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AgentOs, createInMemoryFileSystem } from "@rivet-dev/agent-os-core";
import type { AgentOsOptions, MountConfig } from "@rivet-dev/agent-os-core";
import { AgentOs, createInMemoryFileSystem } from "@rivet-dev/agent-os-core";
import type { DatabaseProvider } from "@/actor/database";
import { actor, event } from "@/actor/mod";
import type { RawAccess } from "@/db/config";
Expand Down Expand Up @@ -42,35 +42,75 @@ async function ensureVm<TConnParams>(
c: AgentOsActionContext<TConnParams>,
config: AgentOsActorConfig<TConnParams>,
): Promise<AgentOs> {
// Fast path: VM already booted.
if (c.vars.agentOs) {
return c.vars.agentOs;
}

const start = Date.now();
// Guard against concurrent callers. If another action is already booting
// the VM, wait for that same promise instead of creating a duplicate.
if (c.vars.vmBootGuard) {
return c.vars.vmBootGuard;
}

// Build options with in-memory VFS as default working directory mount.
const options = buildVmOptions(config.options);
const bootPromise = (async (): Promise<AgentOs> => {
const start = Date.now();

const agentOs = await AgentOs.create(options);
c.vars.agentOs = agentOs;
// Resolve options from the per-actor-instance factory or the static config.
let resolvedOptions: AgentOsOptions | undefined;
if (config.createOptions) {
c.log.debug({ msg: "agent-os resolving createOptions" });
try {
resolvedOptions = await config.createOptions(c);
} catch (err) {
throw new Error(
`agentOs: createOptions callback failed: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err },
);
}
} else {
resolvedOptions = config.options;
}

// Wire cron events to actor events.
agentOs.onCronEvent((cronEvent) => {
c.broadcast("cronEvent", { event: cronEvent });
});
if (!resolvedOptions) {
throw new Error(
"agentOs: createOptions callback returned a falsy value. It must return an AgentOsOptions object.",
);
}

c.broadcast("vmBooted", {});
c.log.info({
msg: "agent-os vm booted",
bootDurationMs: Date.now() - start,
});
// Build options with in-memory VFS as default working directory mount.
const options = buildVmOptions(resolvedOptions);

const agentOs = await AgentOs.create(options);
c.vars.agentOs = agentOs;

// Wire cron events to actor events.
agentOs.onCronEvent((cronEvent) => {
c.broadcast("cronEvent", { event: cronEvent });
});

c.broadcast("vmBooted", {});
c.log.info({
msg: "agent-os vm booted",
bootDurationMs: Date.now() - start,
});

return agentOs;
})();

c.vars.vmBootGuard = bootPromise;

return agentOs;
try {
return await bootPromise;
} catch (err) {
// Clear the cached promise on failure so the next caller retries
// instead of reusing a rejected promise.
c.vars.vmBootGuard = null;
throw err;
}
}

function buildVmOptions(
userOptions?: AgentOsOptions,
): AgentOsOptions {
function buildVmOptions(userOptions?: AgentOsOptions): AgentOsOptions {
const userMounts = userOptions?.mounts ?? [];

// Check if the user already provided a mount at /home/user. If so, respect
Expand Down Expand Up @@ -171,9 +211,12 @@ export function agentOs<TConnParams = undefined>(
sleepGracePeriod: 900_000,
actionTimeout: 900_000,
},
createState: async () => ({}),
createState: async () => ({
sandboxId: null,
}),
createVars: () => ({
agentOs: null,
vmBootGuard: null,
activeSessionIds: new Set<string>(),
activeProcesses: new Set<number>(),
activeHooks: new Set<Promise<void>>(),
Expand Down Expand Up @@ -216,9 +259,13 @@ export function agentOs<TConnParams = undefined>(
activeShells: c.vars.activeShells.size,
});

if (c.vars.agentOs) {
await c.vars.agentOs.dispose();
try {
if (c.vars.agentOs) {
await c.vars.agentOs.dispose();
}
} finally {
c.vars.agentOs = null;
c.vars.vmBootGuard = null;
}

c.broadcast("vmShutdown", { reason: "sleep" as const });
Expand All @@ -231,9 +278,13 @@ export function agentOs<TConnParams = undefined>(
activeShells: c.vars.activeShells.size,
});

if (c.vars.agentOs) {
await c.vars.agentOs.dispose();
try {
if (c.vars.agentOs) {
await c.vars.agentOs.dispose();
}
} finally {
c.vars.agentOs = null;
c.vars.vmBootGuard = null;
}

c.broadcast("vmShutdown", { reason: "destroy" as const });
Expand Down Expand Up @@ -264,4 +315,4 @@ const processExitToken = event<ProcessExitPayload>();
const shellDataToken = event<ShellDataPayload>();
const cronEventToken = event<CronEventPayload>();

export { ensureVm, syncPreventSleep, runHook };
export { ensureVm, runHook, syncPreventSleep };
Loading
Loading