From 6ea161409109d45092789f270be0c6d02552ede3 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 29 Mar 2026 01:28:50 -0700 Subject: [PATCH] fix(rivetkit): disk I/O error on destroy vfs --- .../fixtures/driver-test-suite/registry.ts | 10 + .../fixtures/driver-test-suite/sleep-db.ts | 466 ++++++++++++++++++ .../packages/rivetkit/src/db/mod.ts | 123 +++-- .../packages/rivetkit/src/db/shared.ts | 18 +- .../driver-test-suite/tests/actor-sleep-db.ts | 334 +++++++++++++ .../packages/sqlite-vfs/src/types.ts | 6 + .../packages/sqlite-vfs/src/vfs.ts | 44 +- 7 files changed, 941 insertions(+), 60 deletions(-) diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 652bd1a4a5..1a7a1e1e70 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -94,6 +94,8 @@ import { sleepWithDbAction, sleepWithRawWsCloseDb, sleepWithRawWsCloseDbListener, + sleepWsMessageExceedsGrace, + sleepWsConcurrentDbExceedsGrace, sleepWaitUntil, sleepNestedWaitUntil, sleepEnqueue, @@ -102,6 +104,10 @@ import { sleepWaitUntilRejects, sleepWaitUntilState, sleepWithRawWs, + sleepWsMessageExceedsGrace, + sleepWsConcurrentDbExceedsGrace, + sleepWsActiveDbExceedsGrace, + sleepWsRawDbAfterClose, } from "./sleep-db"; import { lifecycleObserver, startStopRaceActor } from "./start-stop-race"; import { statelessActor } from "./stateless"; @@ -180,6 +186,10 @@ export const registry = setup({ sleepWithRawWs, sleepWithRawWsCloseDb, sleepWithRawWsCloseDbListener, + sleepWsMessageExceedsGrace, + sleepWsConcurrentDbExceedsGrace, + sleepWsActiveDbExceedsGrace, + sleepWsRawDbAfterClose, // From error-handling.ts errorHandlingActor, customTimeoutActor, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep-db.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep-db.ts index 6163a55d5d..b22d99dbcc 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep-db.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep-db.ts @@ -1,6 +1,7 @@ import type { UniversalWebSocket } from "rivetkit"; import { actor, event, queue } from "rivetkit"; import { db } from "rivetkit/db"; +import type { IDatabase, KvVfsOptions } from "@rivetkit/sqlite-vfs"; import { RAW_WS_HANDLER_DELAY, RAW_WS_HANDLER_SLEEP_TIMEOUT, @@ -812,6 +813,314 @@ export const sleepWithRawWsCloseDb = actor({ }, }); +// Grace period shorter than the handler's async work, so the DB gets +// cleaned up while the handler is still running. +const EXCEEDS_GRACE_HANDLER_DELAY = 2000; +const EXCEEDS_GRACE_PERIOD = 200; +const EXCEEDS_GRACE_SLEEP_TIMEOUT = 100; + +export { EXCEEDS_GRACE_HANDLER_DELAY, EXCEEDS_GRACE_PERIOD, EXCEEDS_GRACE_SLEEP_TIMEOUT }; + +// Number of sequential DB writes the handler performs. The loop runs long +// enough that shutdown (close()) runs between two writes. The write that +// follows close() hits the destroyed DB. +const ACTIVE_DB_WRITE_COUNT = 500; +const ACTIVE_DB_WRITE_DELAY_MS = 5; +const ACTIVE_DB_GRACE_PERIOD = 50; +const ACTIVE_DB_SLEEP_TIMEOUT = 500; + +export { + ACTIVE_DB_WRITE_COUNT, + ACTIVE_DB_WRITE_DELAY_MS, + ACTIVE_DB_GRACE_PERIOD, + ACTIVE_DB_SLEEP_TIMEOUT, +}; + +// Reproduces the production "disk I/O error" scenario: the handler is +// actively performing sequential DB writes (each one acquires and releases +// the wrapper mutex) when the grace period expires. Between two writes, +// client.close() acquires the mutex, sets closed=true, then calls +// db.close() outside the mutex. The next write acquires the mutex and +// calls ensureOpen() which throws "Database is closed". +// +// Without ensureOpen (as in the production version), the write would +// call db.exec() on the already-closing database concurrently with +// db.close(), producing "disk I/O error" or "cannot start a transaction +// within a transaction". +export const sleepWsActiveDbExceedsGrace = actor({ + state: { + startCount: 0, + sleepCount: 0, + writesCompleted: 0, + writeError: null as string | null, + }, + db: db({ + onMigrate: async (db) => { + await db.execute(` + CREATE TABLE IF NOT EXISTS sleep_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + }, + }), + onWake: async (c) => { + c.state.startCount += 1; + }, + onSleep: async (c) => { + c.state.sleepCount += 1; + }, + onWebSocket: (c, ws: UniversalWebSocket) => { + ws.addEventListener("message", async (event: any) => { + if (event.data !== "start-writes") return; + + ws.send(JSON.stringify({ type: "started" })); + + // Perform many sequential DB writes. Each write acquires and + // releases the DB wrapper mutex. Between two writes, the + // shutdown's client.close() can slip in and close the DB. + for (let i = 0; i < ACTIVE_DB_WRITE_COUNT; i++) { + try { + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('write-${i}', ${Date.now()})`, + ); + c.state.writesCompleted = i + 1; + } catch (error) { + c.state.writeError = + error instanceof Error ? error.message : String(error); + ws.send( + JSON.stringify({ + type: "error", + index: i, + error: c.state.writeError, + }), + ); + return; + } + + // Small delay between writes to yield the event loop and + // allow shutdown tasks to run. + await new Promise((resolve) => + setTimeout(resolve, ACTIVE_DB_WRITE_DELAY_MS), + ); + } + + ws.send(JSON.stringify({ type: "finished" })); + }); + + ws.send(JSON.stringify({ type: "connected" })); + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + getStatus: (c) => ({ + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + writesCompleted: c.state.writesCompleted, + writeError: c.state.writeError, + }), + getLogEntries: async (c) => { + return await c.db.execute<{ + id: number; + event: string; + created_at: number; + }>(`SELECT * FROM sleep_log ORDER BY id`); + }, + }, + options: { + sleepTimeout: ACTIVE_DB_SLEEP_TIMEOUT, + sleepGracePeriod: ACTIVE_DB_GRACE_PERIOD, + }, +}); + +export const sleepWsMessageExceedsGrace = actor({ + state: { + startCount: 0, + sleepCount: 0, + messageStarted: 0, + messageFinished: 0, + }, + db: db({ + onMigrate: async (db) => { + await db.execute(` + CREATE TABLE IF NOT EXISTS sleep_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + }, + }), + onWake: async (c) => { + c.state.startCount += 1; + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('wake', ${Date.now()})`, + ); + }, + onSleep: async (c) => { + c.state.sleepCount += 1; + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('sleep', ${Date.now()})`, + ); + }, + onWebSocket: (c, ws: UniversalWebSocket) => { + ws.addEventListener("message", async (event: any) => { + if (event.data !== "slow-db-work") return; + + c.state.messageStarted += 1; + + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('msg-start', ${Date.now()})`, + ); + + ws.send(JSON.stringify({ type: "started" })); + + // Wait longer than the grace period so shutdown times out + // and cleans up the database while this handler is still running. + await new Promise((resolve) => + setTimeout(resolve, EXCEEDS_GRACE_HANDLER_DELAY), + ); + + // This DB write runs after the grace period expired and + // #cleanupDatabase already destroyed the SQLite VFS. + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('msg-finish', ${Date.now()})`, + ); + + c.state.messageFinished += 1; + }); + + ws.send(JSON.stringify({ type: "connected" })); + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + getStatus: (c) => ({ + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + messageStarted: c.state.messageStarted, + messageFinished: c.state.messageFinished, + }), + getLogEntries: async (c) => { + return await c.db.execute<{ + id: number; + event: string; + created_at: number; + }>(`SELECT * FROM sleep_log ORDER BY id`); + }, + }, + options: { + sleepTimeout: EXCEEDS_GRACE_SLEEP_TIMEOUT, + sleepGracePeriod: EXCEEDS_GRACE_PERIOD, + }, +}); + +// Reproduces the "cannot start a transaction within a transaction" error. +// Multiple concurrent WS message handlers do DB writes. The grace period +// is shorter than the handler delay, so the VFS gets destroyed while +// handlers are still running. The first handler's DB write fails +// (leaving a transaction open in SQLite), and subsequent handlers get +// "cannot start a transaction within a transaction". +export const sleepWsConcurrentDbExceedsGrace = actor({ + state: { + startCount: 0, + sleepCount: 0, + handlerStarted: 0, + handlerFinished: 0, + handlerErrors: [] as string[], + }, + db: db({ + onMigrate: async (db) => { + await db.execute(` + CREATE TABLE IF NOT EXISTS sleep_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + }, + }), + onWake: async (c) => { + c.state.startCount += 1; + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('wake', ${Date.now()})`, + ); + }, + onSleep: async (c) => { + c.state.sleepCount += 1; + try { + await c.db.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('sleep', ${Date.now()})`, + ); + } catch { + // DB may already be torn down + } + }, + onWebSocket: (c, ws: UniversalWebSocket) => { + ws.addEventListener("message", async (event: any) => { + const data = JSON.parse(String(event.data)); + if (data.type !== "slow-db-work") return; + + const index = data.index ?? 0; + c.state.handlerStarted += 1; + + // Each handler captures the db reference before awaiting. + // After the delay, the VFS may be destroyed. + const dbRef = c.db; + + ws.send(JSON.stringify({ type: "started", index })); + + // Stagger the delay slightly per index so handlers resume at + // different times relative to VFS teardown. + await new Promise((resolve) => + setTimeout(resolve, EXCEEDS_GRACE_HANDLER_DELAY + index * 50), + ); + + // Use the captured dbRef directly. After VFS teardown, the + // underlying sqlite connection is broken. The first handler + // to hit it may get "disk I/O error" (leaving a transaction + // open), and subsequent handlers may get "cannot start a + // transaction within a transaction". + // + // Do NOT catch the error here. Let it propagate so + // #trackWebSocketCallback logs the actual error message + // (visible in test output as "websocket callback failed"). + await dbRef.execute( + `INSERT INTO sleep_log (event, created_at) VALUES ('handler-${index}-finish', ${Date.now()})`, + ); + c.state.handlerFinished += 1; + }); + + ws.send(JSON.stringify({ type: "connected" })); + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + getStatus: (c) => ({ + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + handlerStarted: c.state.handlerStarted, + handlerFinished: c.state.handlerFinished, + handlerErrors: c.state.handlerErrors, + }), + getLogEntries: async (c) => { + return await c.db.execute<{ + id: number; + event: string; + created_at: number; + }>(`SELECT * FROM sleep_log ORDER BY id`); + }, + }, + options: { + sleepTimeout: EXCEEDS_GRACE_SLEEP_TIMEOUT, + sleepGracePeriod: EXCEEDS_GRACE_PERIOD, + }, +}); + export const sleepWithRawWsCloseDbListener = actor({ state: { startCount: 0, @@ -881,3 +1190,160 @@ export const sleepWithRawWsCloseDbListener = actor({ sleepTimeout: RAW_WS_HANDLER_SLEEP_TIMEOUT, }, }); + +// Reproduces the production "disk I/O error". Uses the raw IDatabase +// handle (bypassing ensureOpen). The handler opens a transaction via the +// raw handle, waits for shutdown to destroy the VFS, then tries to +// commit. The commit hits the destroyed VFS and fails with the raw SQLite +// error instead of the "Database is closed" guard. +const RAW_DB_GRACE_PERIOD = 100; +const RAW_DB_SLEEP_TIMEOUT = 500; + +export { RAW_DB_GRACE_PERIOD, RAW_DB_SLEEP_TIMEOUT }; + +type LeakedDbClient = { + execute: (query: string) => Promise; + close: () => Promise; + _rawDb: IDatabase; + _poisonKv: () => void; +}; + +function dbWithLeakedHandle() { + return { + createClient: async (ctx: { + actorId: string; + kv: { + batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; + batchPut: (entries: [Uint8Array, Uint8Array][]) => Promise; + batchDelete: (keys: Uint8Array[]) => Promise; + }; + sqliteVfs?: { open: (fileName: string, options: KvVfsOptions) => Promise }; + }): Promise => { + let poisoned = false; + const poisonError = () => { + throw new Error("KV transport unavailable (simulated WebSocket closure)"); + }; + + const kvStore: KvVfsOptions = { + get: async (key) => { + if (poisoned) poisonError(); + const results = await ctx.kv.batchGet([key]); + return results[0] ?? null; + }, + getBatch: (keys) => { + if (poisoned) poisonError(); + return ctx.kv.batchGet(keys); + }, + put: (key, value) => { + if (poisoned) poisonError(); + return ctx.kv.batchPut([[key, value]]); + }, + putBatch: (entries) => { + if (poisoned) poisonError(); + return ctx.kv.batchPut(entries); + }, + deleteBatch: (keys) => { + if (poisoned) poisonError(); + return ctx.kv.batchDelete(keys); + }, + }; + + const rawDb = await ctx.sqliteVfs!.open(ctx.actorId, kvStore); + + return { + execute: async (query: string): Promise => { + const results: Record[] = []; + let columnNames: string[] | null = null; + await rawDb.exec(query, (row: unknown[], columns: string[]) => { + if (!columnNames) columnNames = columns; + const rowObj: Record = {}; + for (let i = 0; i < row.length; i++) { + rowObj[columnNames[i]] = row[i]; + } + results.push(rowObj); + }); + return results; + }, + close: async () => { + await rawDb.close(); + }, + _rawDb: rawDb, + _poisonKv: () => { poisoned = true; }, + }; + }, + onMigrate: async (client: LeakedDbClient) => { + await client.execute(` + CREATE TABLE IF NOT EXISTS sleep_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + }, + onDestroy: async (client: LeakedDbClient) => { + await client.close(); + }, + }; +} + +export const sleepWsRawDbAfterClose = actor({ + state: { + startCount: 0, + sleepCount: 0, + }, + db: dbWithLeakedHandle(), + onWake: (c) => { + c.state.startCount += 1; + }, + onSleep: (c) => { + c.state.sleepCount += 1; + }, + onWebSocket: (c, ws: UniversalWebSocket) => { + ws.addEventListener("message", async (event: any) => { + if (event.data !== "raw-db-after-close") return; + + // Access the raw handle and poison function directly from c.db. + // The custom db provider exposes these as _rawDb and _poisonKv. + const dbClient = c.db as unknown as LeakedDbClient; + const rawDb = dbClient._rawDb; + const poisonKv = dbClient._poisonKv; + + // Start a transaction using the raw handle. + await rawDb.exec("BEGIN"); + await rawDb.exec( + `INSERT INTO sleep_log (event, created_at) VALUES ('before-poison', ${Date.now()})`, + ); + + // Poison the KV store to simulate the runner WebSocket dying. + poisonKv(); + + ws.send(JSON.stringify({ type: "started" })); + + // Try to COMMIT. The VFS will try to write pages via KV, + // but KV is poisoned so the write fails. The VFS returns + // SQLITE_IOERR to SQLite, which throws "disk I/O error". + try { + await rawDb.exec("COMMIT"); + ws.send(JSON.stringify({ type: "committed" })); + } catch (error) { + ws.send( + JSON.stringify({ + type: "error", + error: error instanceof Error ? error.message : String(error), + }), + ); + } + }); + + ws.send(JSON.stringify({ type: "connected" })); + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + }, + options: { + sleepTimeout: RAW_DB_SLEEP_TIMEOUT, + sleepGracePeriod: RAW_DB_GRACE_PERIOD, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 970a3cd639..def08f4474 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -54,11 +54,15 @@ export function db({ ); } + let lastVfsError: unknown = null; const kvStore = createActorKvStore( ctx.kv, ctx.metrics, ctx.preloadedEntries, ); + kvStore.onError = (error: unknown) => { + lastVfsError = error; + }; const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore); let closed = false; const mutex = new AsyncMutex(); @@ -93,55 +97,77 @@ export function db({ // Keep using `db.exec` for non-parameterized SQL because it // supports multi-statement migrations. let result: TRow[]; - if (args.length > 0) { - const bindings = - args.length === 1 && - isSqliteBindingObject(args[0]) - ? toSqliteBindings(args[0]) - : toSqliteBindings(args); - const token = query - .trimStart() - .slice(0, 16) - .toUpperCase(); - const returnsRows = - token.startsWith("SELECT") || - token.startsWith("PRAGMA") || - token.startsWith("WITH") || - /\bRETURNING\b/i.test(query); + try { + if (args.length > 0) { + const bindings = + args.length === 1 && + isSqliteBindingObject(args[0]) + ? toSqliteBindings(args[0]) + : toSqliteBindings(args); + const token = query + .trimStart() + .slice(0, 16) + .toUpperCase(); + const returnsRows = + token.startsWith("SELECT") || + token.startsWith("PRAGMA") || + token.startsWith("WITH") || + /\bRETURNING\b/i.test(query); - if (returnsRows) { - const { rows, columns } = await db.query( + if (returnsRows) { + const { rows, columns } = await db.query( + query, + bindings, + ); + result = rows.map((row: unknown[]) => { + const rowObj: Record< + string, + unknown + > = {}; + for ( + let i = 0; + i < columns.length; + i++ + ) { + rowObj[columns[i]] = row[i]; + } + return rowObj; + }) as TRow[]; + } else { + await db.run(query, bindings); + result = [] as TRow[]; + } + } else { + const results: Record[] = []; + let columnNames: string[] | null = null; + await db.exec( query, - bindings, + (row: unknown[], columns: string[]) => { + if (!columnNames) { + columnNames = columns; + } + const rowObj: Record< + string, + unknown + > = {}; + for (let i = 0; i < row.length; i++) { + rowObj[columnNames[i]] = row[i]; + } + results.push(rowObj); + }, ); - result = rows.map((row: unknown[]) => { - const rowObj: Record = {}; - for (let i = 0; i < columns.length; i++) { - rowObj[columns[i]] = row[i]; - } - return rowObj; - }) as TRow[]; - } else { - await db.run(query, bindings); - result = [] as TRow[]; + result = results as TRow[]; } - } else { - const results: Record[] = []; - let columnNames: string[] | null = null; - await db.exec( - query, - (row: unknown[], columns: string[]) => { - if (!columnNames) { - columnNames = columns; - } - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { - rowObj[columnNames[i]] = row[i]; - } - results.push(rowObj); - }, - ); - result = results as TRow[]; + } catch (error) { + if (lastVfsError != null) { + const cause = lastVfsError; + lastVfsError = null; + throw new Error( + `Database query failed because the underlying storage is no longer available (${cause instanceof Error ? cause.message : String(cause)}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`, + { cause: cause instanceof Error ? cause : undefined }, + ); + } + throw error; } const durationMs = performance.now() - start; @@ -163,6 +189,13 @@ export function db({ }); }, close: async () => { + // Poison the KV store before acquiring the mutex. If a + // query is in-flight (holding the mutex), its next VFS + // page read/write will fail immediately with a + // descriptive error instead of hanging or producing a + // cryptic "disk I/O error". + kvStore.poison(); + const shouldClose = await mutex.run(async () => { if (closed) return false; closed = true; diff --git a/rivetkit-typescript/packages/rivetkit/src/db/shared.ts b/rivetkit-typescript/packages/rivetkit/src/db/shared.ts index 462b420953..3e7b728683 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/shared.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/shared.ts @@ -89,11 +89,20 @@ export function createActorKvStore( kv: ActorKvOperations, metrics?: ActorMetrics, preloadedEntries?: PreloadedEntries, -): KvVfsOptions & { clearPreload: () => void } { +): KvVfsOptions & { clearPreload: () => void; poison: () => void } { let preload: PreloadedEntries | undefined = preloadedEntries; + let poisoned = false; + const ensureNotPoisoned = () => { + if (poisoned) { + throw new Error( + "Database is shutting down. A query was still in progress when the actor started stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.", + ); + } + }; return { get: async (key: Uint8Array) => { + ensureNotPoisoned(); // Preload hits bypass KV entirely and are not tracked in // kvGet metrics. Only cache misses are counted below. if (preload) { @@ -110,6 +119,7 @@ export function createActorKvStore( return results[0] ?? null; }, getBatch: async (keys: Uint8Array[]) => { + ensureNotPoisoned(); if (!preload || keys.length === 0) { const start = performance.now(); const results = await kv.batchGet(keys); @@ -155,6 +165,7 @@ export function createActorKvStore( return results; }, put: async (key: Uint8Array, value: Uint8Array) => { + ensureNotPoisoned(); const start = performance.now(); await kv.batchPut([[key, value]]); if (metrics) { @@ -164,6 +175,7 @@ export function createActorKvStore( } }, putBatch: async (entries: [Uint8Array, Uint8Array][]) => { + ensureNotPoisoned(); const start = performance.now(); await kv.batchPut(entries); if (metrics) { @@ -173,6 +185,7 @@ export function createActorKvStore( } }, deleteBatch: async (keys: Uint8Array[]) => { + ensureNotPoisoned(); const start = performance.now(); await kv.batchDelete(keys); if (metrics) { @@ -184,6 +197,9 @@ export function createActorKvStore( clearPreload: () => { preload = undefined; }, + poison: () => { + poisoned = true; + }, }; } diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-sleep-db.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-sleep-db.ts index 6958583954..df0da82987 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-sleep-db.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-sleep-db.ts @@ -2,6 +2,14 @@ import { describe, expect, test, vi } from "vitest"; import { RAW_WS_HANDLER_DELAY } from "../../../fixtures/driver-test-suite/sleep"; import { SLEEP_DB_TIMEOUT, + EXCEEDS_GRACE_HANDLER_DELAY, + EXCEEDS_GRACE_PERIOD, + EXCEEDS_GRACE_SLEEP_TIMEOUT, + ACTIVE_DB_WRITE_COUNT, + ACTIVE_DB_WRITE_DELAY_MS, + ACTIVE_DB_GRACE_PERIOD, + ACTIVE_DB_SLEEP_TIMEOUT, + RAW_DB_SLEEP_TIMEOUT, } from "../../../fixtures/driver-test-suite/sleep-db"; import type { DriverTestConfig } from "../mod"; import { setupDriverTest, waitFor } from "../utils"; @@ -838,6 +846,332 @@ export function runActorSleepDbTests(driverTestConfig: DriverTestConfig) { ); expect(events).toContain("alarm"); }); + + test( + "ws handler exceeding grace period should still complete db writes", + async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + ); + + const actor = + client.sleepWsMessageExceedsGrace.getOrCreate([ + "ws-exceeds-grace", + ]); + const ws = await connectRawWebSocket(actor); + + // Send a message that starts slow async DB work + ws.send("slow-db-work"); + + // Wait for the handler to confirm it started + await new Promise((resolve) => { + const onMessage = (event: MessageEvent) => { + const data = JSON.parse(String(event.data)); + if (data.type === "started") { + ws.removeEventListener( + "message", + onMessage, + ); + resolve(); + } + }; + ws.addEventListener("message", onMessage); + }); + + // Trigger sleep while the handler is still doing slow + // work. The grace period (200ms) is much shorter than the + // handler delay (2000ms), so shutdown will time out and + // clean up the database while the handler is still running. + await actor.triggerSleep(); + + // Wait for the handler to finish and the actor to complete + // its sleep cycle. The handler runs for 2000ms. After that + // the actor sleeps (the timed-out shutdown already ran, but + // the handler promise still resolves in the background). + await waitFor( + driverTestConfig, + EXCEEDS_GRACE_HANDLER_DELAY + + EXCEEDS_GRACE_SLEEP_TIMEOUT + + 500, + ); + + // Wake the actor and check what happened. + const status = await actor.getStatus(); + expect(status.sleepCount).toBeGreaterThanOrEqual(1); + expect(status.startCount).toBeGreaterThanOrEqual(2); + + // The handler started. + expect(status.messageStarted).toBe(1); + + // BUG: The handler's second DB write should succeed, but + // the grace period expired and the database was cleaned up + // before the handler finished. The handler's post-delay + // c.db.execute call runs against a destroyed database, + // so messageFinished is never incremented and "msg-finish" + // is missing from the log. + // + // Correct behavior: the handler should complete and + // msg-finish should appear in the DB. + expect(status.messageFinished).toBe(1); + + const entries = await actor.getLogEntries(); + const events = entries.map( + (e: { event: string }) => e.event, + ); + expect(events).toContain("msg-start"); + expect(events).toContain("msg-finish"); + }, + { timeout: 15_000 }, + ); + + test( + "concurrent ws handlers with cached db ref get errors when grace period exceeded", + async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + ); + + const actor = + client.sleepWsConcurrentDbExceedsGrace.getOrCreate( + ["ws-concurrent-exceeds-grace"], + ); + const ws = await connectRawWebSocket(actor); + + const MESSAGE_COUNT = 3; + let startedCount = 0; + + // Set up listener for "started" confirmations + const allStarted = new Promise((resolve) => { + const onMessage = (event: MessageEvent) => { + const data = JSON.parse(String(event.data)); + if (data.type === "started") { + startedCount++; + if (startedCount === MESSAGE_COUNT) { + ws.removeEventListener( + "message", + onMessage, + ); + resolve(); + } + } + }; + ws.addEventListener("message", onMessage); + }); + + // Send multiple messages rapidly. Each handler captures + // c.db before awaiting and uses the cached reference after + // the delay. Multiple handlers will try to use the cached + // db reference after VFS teardown. + for (let i = 0; i < MESSAGE_COUNT; i++) { + ws.send( + JSON.stringify({ + type: "slow-db-work", + index: i, + }), + ); + } + + // Wait for all handlers to confirm they started + await allStarted; + + // Trigger sleep while all handlers are doing slow work + await actor.triggerSleep(); + + // Wait for handlers to finish + actor to sleep and wake + await waitFor( + driverTestConfig, + EXCEEDS_GRACE_HANDLER_DELAY + + MESSAGE_COUNT * 50 + + EXCEEDS_GRACE_SLEEP_TIMEOUT + + 500, + ); + + // Wake the actor. All handlers should have completed + // their DB writes successfully. + const status = await actor.getStatus(); + expect(status.sleepCount).toBeGreaterThanOrEqual(1); + expect(status.startCount).toBeGreaterThanOrEqual(2); + expect(status.handlerStarted).toBe(MESSAGE_COUNT); + + // BUG: The handlers' post-delay DB writes fail because + // the grace period expired and the VFS was destroyed. + // With a cached db reference and staggered delays, the + // first handler to resume may get "disk I/O error" and + // leave a transaction open, and subsequent handlers get + // "cannot start a transaction within a transaction". + // + // Correct behavior: all handler DB writes should succeed. + expect(status.handlerFinished).toBe(MESSAGE_COUNT); + expect(status.handlerErrors).toEqual([]); + }, + { timeout: 15_000 }, + ); + + test( + "active db writes interrupted by sleep produce db error", + async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + ); + + const actor = + client.sleepWsActiveDbExceedsGrace.getOrCreate([ + "ws-active-db-exceeds-grace", + ]); + const ws = await connectRawWebSocket(actor); + + // Listen for error message from the handler. The + // handler sends { type: "error", index, error } over + // the WebSocket when the DB write fails. + const errorPromise = new Promise<{ + index: number; + error: string; + }>((resolve) => { + const onMessage = (event: MessageEvent) => { + const data = JSON.parse(String(event.data)); + if (data.type === "error") { + ws.removeEventListener( + "message", + onMessage, + ); + resolve(data); + } + }; + ws.addEventListener("message", onMessage); + }); + + // Start the write loop + ws.send("start-writes"); + + // Wait for confirmation + await new Promise((resolve) => { + const onMessage = (event: MessageEvent) => { + const data = JSON.parse(String(event.data)); + if (data.type === "started") { + ws.removeEventListener( + "message", + onMessage, + ); + resolve(); + } + }; + ws.addEventListener("message", onMessage); + }); + + // Trigger sleep while writes are in progress. + await actor.triggerSleep(); + + // Wait for the error message from the handler. + const errorData = await errorPromise; + + // The handler's write was interrupted by shutdown. + // With the file-system driver, the c.db getter throws + // ActorStopping because #db is already undefined. With + // the engine driver, the KV transport fails mid-query + // and the VFS onError callback produces a descriptive + // "underlying storage is no longer available" message. + expect(errorData.error).toMatch( + /actor stop|database accessed after|Database is closed|underlying storage/i, + ); + expect(errorData.index).toBeGreaterThan(0); + expect(errorData.index).toBeLessThan( + ACTIVE_DB_WRITE_COUNT, + ); + + // Wait for actor to sleep + wake so we can query it. + await waitFor( + driverTestConfig, + ACTIVE_DB_SLEEP_TIMEOUT + 500, + ); + + // Verify the DB has fewer rows than the full count. + const entries = await actor.getLogEntries(); + const writeEntries = entries.filter( + (e: { event: string }) => + e.event.startsWith("write-"), + ); + expect(writeEntries.length).toBeGreaterThan(0); + expect(writeEntries.length).toBeLessThan( + ACTIVE_DB_WRITE_COUNT, + ); + }, + { timeout: 30_000 }, + ); + + test( + "poisoned KV produces disk I/O error on commit", + async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + ); + + const actor = + client.sleepWsRawDbAfterClose.getOrCreate([ + `raw-db-${crypto.randomUUID()}`, + ]); + const ws = await connectRawWebSocket(actor); + + // Listen for the error (or committed) message. + const resultPromise = new Promise<{ + type: string; + error?: string; + }>((resolve) => { + const onMessage = (event: MessageEvent) => { + const data = JSON.parse(String(event.data)); + if ( + data.type === "error" || + data.type === "committed" + ) { + ws.removeEventListener( + "message", + onMessage, + ); + resolve(data); + } + }; + ws.addEventListener("message", onMessage); + }); + + // Tell the handler to BEGIN a transaction, poison the + // KV store, then try to COMMIT. + ws.send("raw-db-after-close"); + + // Wait for the handler's result with a timeout. The + // COMMIT may hang if the VFS error causes SQLite's + // pager to enter a retry loop, so we set a deadline. + const result = await Promise.race([ + resultPromise, + new Promise<{ type: string; error?: string }>( + (resolve) => + setTimeout( + () => + resolve({ + type: "timeout", + error: "handler did not respond within 5s", + }), + 5000, + ), + ), + ]); + + // The COMMIT should have failed with a raw SQLite + // error caused by the poisoned KV. The exact message + // depends on which VFS operation fails first: + // "disk I/O error" (xWrite) or "unable to open + // database file" (xOpen during rollback). + expect(result.type).not.toBe("committed"); + if (result.type === "error") { + expect(result.error).toMatch( + /disk I\/O|unable to open|SQLITE_IOERR/i, + ); + } + }, + { timeout: 15_000 }, + ); }, ); } diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/types.ts b/rivetkit-typescript/packages/sqlite-vfs/src/types.ts index 50a1ef24a1..3b0c107793 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/types.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/types.ts @@ -9,4 +9,10 @@ export interface KvVfsOptions { putBatch: (entries: [Uint8Array, Uint8Array][]) => Promise; /** Delete multiple keys */ deleteBatch: (keys: Uint8Array[]) => Promise; + /** + * Called when a KV operation fails inside a VFS callback. The VFS must + * return a generic SQLite error code to the pager, so the original error + * is lost unless the caller captures it through this callback. + */ + onError?: (error: unknown) => void; } diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts index 841b44922d..1473ce9110 100644 --- a/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/src/vfs.ts @@ -840,7 +840,8 @@ class SqliteSystem implements SqliteVfsRegistration { let sizeData: Uint8Array | null; try { sizeData = await options.get(metaKey); - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_CANTOPEN; } @@ -872,7 +873,8 @@ class SqliteSystem implements SqliteVfsRegistration { [chunkKey, EMPTY_DB_PAGE], [metaKey, encodeFileMeta(size)], ]); - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_CANTOPEN; } } else { @@ -880,7 +882,8 @@ class SqliteSystem implements SqliteVfsRegistration { size = 0; try { await options.put(metaKey, encodeFileMeta(size)); - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_CANTOPEN; } } @@ -924,8 +927,9 @@ class SqliteSystem implements SqliteVfsRegistration { await file.options.put(file.metaKey, encodeFileMeta(file.size)); file.metaDirty = false; } - } catch { + } catch (error) { // Always clean up the file handle even if the KV operation fails. + file.options.onError?.(error); this.#openFiles.delete(fileId); return VFS.SQLITE_IOERR; } @@ -990,7 +994,8 @@ class SqliteSystem implements SqliteVfsRegistration { try { kvChunks = chunkKeys.length > 0 ? await options.getBatch(chunkKeys) : []; - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_IOERR_READ; } @@ -1153,7 +1158,8 @@ class SqliteSystem implements SqliteVfsRegistration { chunkKeysToFetch.length > 0 ? await options.getBatch(chunkKeysToFetch) : []; - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_IOERR_WRITE; } @@ -1206,7 +1212,8 @@ class SqliteSystem implements SqliteVfsRegistration { // Write all chunks and metadata try { await options.putBatch(entriesToWrite); - } catch { + } catch (error) { + options.onError?.(error); file.size = previousSize; file.metaDirty = previousMetaDirty; return VFS.SQLITE_IOERR_WRITE; @@ -1243,7 +1250,8 @@ class SqliteSystem implements SqliteVfsRegistration { file.metaDirty = true; try { await options.put(file.metaKey, encodeFileMeta(file.size)); - } catch { + } catch (error) { + options.onError?.(error); file.size = previousSize; file.metaDirty = previousMetaDirty; return VFS.SQLITE_IOERR_TRUNCATE; @@ -1267,7 +1275,8 @@ class SqliteSystem implements SqliteVfsRegistration { file.metaDirty = true; try { await options.put(file.metaKey, encodeFileMeta(file.size)); - } catch { + } catch (error) { + options.onError?.(error); file.size = previousSize; file.metaDirty = previousMetaDirty; return VFS.SQLITE_IOERR_TRUNCATE; @@ -1303,7 +1312,8 @@ class SqliteSystem implements SqliteVfsRegistration { keysToDelete.slice(b, b + KV_MAX_BATCH_KEYS), ); } - } catch { + } catch (error) { + options.onError?.(error); return VFS.SQLITE_IOERR_TRUNCATE; } @@ -1318,7 +1328,8 @@ class SqliteSystem implements SqliteVfsRegistration { try { await file.options.put(file.metaKey, encodeFileMeta(file.size)); - } catch { + } catch (error) { + file.options.onError?.(error); return VFS.SQLITE_IOERR_FSYNC; } file.metaDirty = false; @@ -1343,7 +1354,10 @@ class SqliteSystem implements SqliteVfsRegistration { ): Promise { try { await this.#delete(this.#module.UTF8ToString(zName)); - } catch { + } catch (error) { + // xDelete doesn't have a file handle, so we can't resolve + // options.onError here. The error is still surfaced by + // SQLite as SQLITE_IOERR_DELETE. return VFS.SQLITE_IOERR_DELETE; } return VFS.SQLITE_OK; @@ -1401,7 +1415,8 @@ class SqliteSystem implements SqliteVfsRegistration { let metaData: Uint8Array | null; try { metaData = await resolved.options.get(compactMetaKey); - } catch { + } catch (error) { + resolved.options.onError?.(error); return VFS.SQLITE_IOERR_ACCESS; } @@ -1474,7 +1489,8 @@ class SqliteSystem implements SqliteVfsRegistration { try { await options.putBatch(entries); - } catch { + } catch (error) { + options.onError?.(error); file.dirtyBuffer = null; file.size = file.savedFileSize; file.metaDirty = false;