From d9a284cb1bc5fc378289d532fa2c0c514d4322bd Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 28 May 2026 17:48:03 -0700 Subject: [PATCH 1/3] [world-local] Reduce sequential replay I/O --- .changeset/quick-local-replay.md | 5 + packages/world-local/src/fs.test.ts | 44 ++++++ packages/world-local/src/fs.ts | 62 ++++++-- packages/world-local/src/index.ts | 9 +- packages/world-local/src/storage.test.ts | 105 +++++++++++++- .../world-local/src/storage/events-storage.ts | 133 +++++++++++++++--- packages/world-local/src/storage/index.ts | 6 +- packages/world-local/src/tag.test.ts | 26 ++++ 8 files changed, 356 insertions(+), 34 deletions(-) create mode 100644 .changeset/quick-local-replay.md diff --git a/.changeset/quick-local-replay.md b/.changeset/quick-local-replay.md new file mode 100644 index 0000000000..250c371b28 --- /dev/null +++ b/.changeset/quick-local-replay.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-local': patch +--- + +Reduce local sequential-step replay I/O with bounded recent-event and storage-directory caches. diff --git a/packages/world-local/src/fs.test.ts b/packages/world-local/src/fs.test.ts index c264a11853..0ab50171c0 100644 --- a/packages/world-local/src/fs.test.ts +++ b/packages/world-local/src/fs.test.ts @@ -17,6 +17,8 @@ import { import { z } from 'zod'; import { assertSafeEntityId, + clearCreatedFilesCache, + ensureDir, paginatedFileSystemQuery, readFirstByte, readJSONWithFallback, @@ -24,6 +26,7 @@ import { taggedPath, UnsafeEntityIdError, ulidToDate, + writeExclusive, writeJSON, } from './fs.js'; @@ -113,6 +116,47 @@ describe('fs utilities', () => { }); }); + describe('ensureDir', () => { + it('does not repeat mkdir for a directory created by this process', async () => { + clearCreatedFilesCache(); + const nestedDir = path.join(testDir, 'events'); + const mkdirSpy = vi.spyOn(fs, 'mkdir'); + + await ensureDir(nestedDir); + await ensureDir(nestedDir); + + expect( + mkdirSpy.mock.calls.filter(([dirPath]) => dirPath === nestedDir) + ).toHaveLength(1); + }); + + it('recreates a cached directory removed before an atomic write', async () => { + clearCreatedFilesCache(); + const eventsDir = path.join(testDir, 'events'); + const firstPath = path.join(eventsDir, 'first.json'); + const secondPath = path.join(eventsDir, 'second.json'); + + await writeJSON(firstPath, { value: 'first' }); + await fs.rm(eventsDir, { recursive: true, force: true }); + await writeJSON(secondPath, { value: 'second' }); + + expect(JSON.parse(await fs.readFile(secondPath, 'utf8'))).toEqual({ + value: 'second', + }); + }); + + it('recreates a cached directory removed before an exclusive write', async () => { + clearCreatedFilesCache(); + const locksDir = path.join(testDir, '.locks'); + + expect(await writeExclusive(path.join(locksDir, 'first'), '')).toBe(true); + await fs.rm(locksDir, { recursive: true, force: true }); + expect(await writeExclusive(path.join(locksDir, 'second'), '')).toBe( + true + ); + }); + }); + describe('paginatedFileSystemQuery', () => { // Simple getCreatedAt function that strips .json and tries to parse as ULID const getCreatedAt = (filename: string): Date | null => { diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index 22cb3d942b..baadc82ec5 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -128,12 +128,16 @@ async function withWindowsRetry( // In-memory cache of created files to avoid expensive fs.access() calls // This is safe because we only write once per file path (no overwrites without explicit flag) const createdFilesCache = new Set(); +// Writes repeatedly target a small fixed set of entity directories. Once one +// exists in this process, avoid another recursive mkdir syscall per event. +const createdDirectoriesCache = new Set(); /** - * Clear the created files cache. Useful for testing or when files are deleted externally. + * Clear write-path caches. Useful for testing or when files are deleted externally. */ export function clearCreatedFilesCache(): void { createdFilesCache.clear(); + createdDirectoriesCache.clear(); } export { ulidToDate } from '@workflow/world'; @@ -252,13 +256,38 @@ export async function listTaggedFilesByExtension( } export async function ensureDir(dirPath: string): Promise { + const resolvedPath = path.resolve(dirPath); + if (createdDirectoriesCache.has(resolvedPath)) { + return; + } try { - await fs.mkdir(dirPath, { recursive: true }); + await fs.mkdir(resolvedPath, { recursive: true }); + createdDirectoriesCache.add(resolvedPath); } catch (_error) { // Ignore if already exists } } +async function withEnsuredDirectory( + dirPath: string, + operation: () => Promise +): Promise { + await ensureDir(dirPath); + try { + return await operation(); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + throw error; + } + + // A dev server may outlive an external cleanup of its data directory. + // Forget the cached directory and retry once after recreating it. + createdDirectoriesCache.delete(path.resolve(dirPath)); + await ensureDir(dirPath); + return operation(); + } +} + interface WriteOptions { overwrite?: boolean; } @@ -342,10 +371,11 @@ export async function write( const tempPath = `${filePath}.tmp.${ulid()}`; let tempFileCreated = false; try { - await ensureDir(path.dirname(filePath)); - await fs.writeFile(tempPath, data); - tempFileCreated = true; - await withWindowsRetry(() => fs.rename(tempPath, filePath)); + await withEnsuredDirectory(path.dirname(filePath), async () => { + await fs.writeFile(tempPath, data); + tempFileCreated = true; + await withWindowsRetry(() => fs.rename(tempPath, filePath)); + }); // Track this file in cache so future writes know it exists createdFilesCache.add(filePath); } catch (error) { @@ -405,10 +435,11 @@ export async function writeExclusive( filePath: string, data: string ): Promise { - await ensureDir(path.dirname(filePath)); try { - await fs.writeFile(filePath, data, { flag: 'wx' }); - return true; + return await withEnsuredDirectory(path.dirname(filePath), async () => { + await fs.writeFile(filePath, data, { flag: 'wx' }); + return true; + }); } catch (error: any) { if (error.code === 'EEXIST') { return false; @@ -439,6 +470,12 @@ export async function listFilesByExtension( interface PaginatedFileSystemQueryConfig { directory: string; schema: z.ZodType; + /** + * Optional immutable-item cache, keyed by absolute file path. Event files + * are append-only, so world-local can replay events it just persisted + * without rereading and reparsing them from disk. + */ + cachedItems?: ReadonlyMap; filePrefix?: string; fileIdFilter?: (fileId: string) => boolean; filter?: (item: T) => boolean; @@ -474,6 +511,7 @@ export async function paginatedFileSystemQuery( const { directory, schema, + cachedItems, filePrefix, fileIdFilter, filter, @@ -544,7 +582,11 @@ export async function paginatedFileSystemQuery( const filePath = path.join(directory, `${fileId}.json`); let item: T | null = null; try { - item = await readJSON(filePath, schema); + const cachedItem = cachedItems?.get(filePath); + item = + cachedItem === undefined + ? await readJSON(filePath, schema) + : structuredClone(cachedItem); } catch (error: unknown) { // We don't expect zod errors to happen, but if the JSON does get malformed, // we skip the item. Preferably, we'd have a way to mark items as malformed, diff --git a/packages/world-local/src/index.ts b/packages/world-local/src/index.ts index e2d15bedf5..76cec93121 100644 --- a/packages/world-local/src/index.ts +++ b/packages/world-local/src/index.ts @@ -64,7 +64,10 @@ export function createLocalWorld(args?: Partial): LocalWorld { const mergedConfig = { ...config.value, ...definedArgs }; const tag = mergedConfig.tag; const queue = createQueue(mergedConfig); - const storage = createStorage(mergedConfig.dataDir, tag); + const { clearCache: clearStorageCache, ...storage } = createStorage( + mergedConfig.dataDir, + tag + ); const recoverActiveRuns = mergedConfig.recoverActiveRuns ?? true; return { specVersion: SPEC_VERSION_CURRENT, @@ -94,9 +97,11 @@ export function createLocalWorld(args?: Partial): LocalWorld { await reenqueueActiveRuns(recoveryRuns, queue.queue, 'world-local'); }, async close() { + clearStorageCache(); await queue.close(); }, async clear() { + clearStorageCache(); if (tag) { // Selectively delete only files matching this tag const basedir = mergedConfig.dataDir; @@ -159,6 +164,8 @@ export function createLocalWorld(args?: Partial): LocalWorld { // Clear the in-memory write cache so deleted paths are forgotten clearCreatedFilesCache(); } else { + // `rm()` removes directories that the write path may have cached. + clearCreatedFilesCache(); await rm(mergedConfig.dataDir, { recursive: true, force: true }); await initDataDir(mergedConfig.dataDir); } diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index c885f16132..eb7301e879 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -5,7 +5,7 @@ import { WorkflowWorldError } from '@workflow/errors'; import type { Event, Storage } from '@workflow/world'; import { stripEventDataRefs } from '@workflow/world'; import { monotonicFactory } from 'ulid'; -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { writeJSON } from './fs.js'; import { hashToken } from './storage/helpers.js'; import { createStorage } from './storage.js'; @@ -1301,6 +1301,109 @@ describe('Storage', () => { expect(result.data[3].eventType).toBe('hook_disposed'); }); }); + + it('reuses locally appended events without exposing cached instances', async () => { + const created = await storage.events.create(null, { + eventType: 'run_created', + eventData: { + deploymentId: 'deployment-cache', + workflowName: 'cached-event-workflow', + input: new Uint8Array([1]), + }, + }); + const runId = created.event.runId; + (created.event as any).eventData.input[0] = 9; + const readFileSpy = vi.spyOn(fs, 'readFile'); + + const first = await storage.events.list({ runId }); + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads).toHaveLength(0); + expect((first.data[0] as any).eventData.input).toEqual( + new Uint8Array([1]) + ); + + (first.data[0] as { eventType: string }).eventType = 'run_failed'; + const second = await storage.events.list({ runId }); + expect(second.data[0]?.eventType).toBe('run_created'); + }); + + it('reads oversized event payloads from disk instead of retaining them', async () => { + const created = await storage.events.create(null, { + eventType: 'run_created', + eventData: { + deploymentId: 'deployment-large', + workflowName: 'large-event-workflow', + input: new Uint8Array(4 * 1024 * 1024), + }, + }); + const readFileSpy = vi.spyOn(fs, 'readFile'); + + await storage.events.list({ runId: created.event.runId }); + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads.length).toBeGreaterThan(0); + }); + + it('normalizes cached event metadata the same way as disk reads', async () => { + const created = await storage.events.create(null, { + eventType: 'run_created', + eventData: { + deploymentId: 'deployment-normalized', + workflowName: 'normalized-cache-workflow', + input: new Uint8Array([1]), + executionContext: { + timestamp: new Date('2026-01-01T00:00:00.000Z'), + }, + }, + }); + + const page = await storage.events.list({ runId: created.event.runId }); + + expect((page.data[0] as any).eventData.executionContext.timestamp).toBe( + '2026-01-01T00:00:00.000Z' + ); + }); + + it('allows active-event cache contents to be explicitly released', async () => { + const localStorage = createStorage(testDir); + const run = await createRun(localStorage, { + deploymentId: 'deployment-clear', + workflowName: 'cleared-cache-workflow', + input: new Uint8Array([1]), + }); + localStorage.clearCache(); + const readFileSpy = vi.spyOn(fs, 'readFile'); + + await localStorage.events.list({ runId: run.runId }); + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads.length).toBeGreaterThan(0); + }); + + it('releases locally cached events after a run completes', async () => { + const run = await createRun(storage, { + deploymentId: 'deployment-complete', + workflowName: 'completed-cache-workflow', + input: new Uint8Array([1]), + }); + await updateRun(storage, run.runId, 'run_completed', { + output: new Uint8Array([2]), + }); + const readFileSpy = vi.spyOn(fs, 'readFile'); + + await storage.events.list({ runId: run.runId }); + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads.length).toBeGreaterThan(0); + }); }); describe('hooks', () => { diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 31740df540..0a3d513713 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -36,12 +36,14 @@ import { assertSafeEntityId, deleteJSON, jsonReplacer, + jsonReviver, listJSONFiles, paginatedFileSystemQuery, readJSON, readJSONWithFallback, resolveWithinBase, taggedPath, + write, writeExclusive, writeJSON, } from '../fs.js'; @@ -164,11 +166,113 @@ async function writeRunUnderLifecycleLock( * Creates the events storage implementation using the filesystem. * Implements the Storage['events'] interface with create, list, and listByCorrelationId operations. */ +export type LocalEventsStorage = Storage['events'] & { + clearCache(): void; +}; + export function createEventsStorage( basedir: string, tag?: string -): Storage['events'] { +): LocalEventsStorage { + // Events are append-only. Keep a bounded window of locally persisted events + // available to immediate replay without rereading JSON files. Payload bytes + // and entry count are both bounded so active/waiting runs cannot retain + // unbounded histories in a long-lived development server. + const maxCachedEventBytes = 4 * 1024 * 1024; + const maxCachedEventEntries = 1000; + const eventCache = new Map(); + const cachedEventBytes = new Map(); + const cachedPathsByRunId = new Map>(); + let totalCachedEventBytes = 0; + + function deleteCachedEvent(eventPath: string): void { + const event = eventCache.get(eventPath); + if (!event) { + return; + } + eventCache.delete(eventPath); + totalCachedEventBytes -= cachedEventBytes.get(eventPath) ?? 0; + cachedEventBytes.delete(eventPath); + const cachedPaths = cachedPathsByRunId.get(event.runId); + cachedPaths?.delete(eventPath); + if (cachedPaths?.size === 0) { + cachedPathsByRunId.delete(event.runId); + } + } + + function clearRunCache(runId: string): void { + for (const cachedPath of cachedPathsByRunId.get(runId) ?? []) { + deleteCachedEvent(cachedPath); + } + } + + function clearCache(): void { + eventCache.clear(); + cachedEventBytes.clear(); + cachedPathsByRunId.clear(); + totalCachedEventBytes = 0; + } + + function cacheEvent( + eventPath: string, + cachedEvent: Event, + serializedBytes: number + ): void { + if (serializedBytes > maxCachedEventBytes) { + return; + } + + while ( + eventCache.size > 0 && + (eventCache.size >= maxCachedEventEntries || + totalCachedEventBytes + serializedBytes > maxCachedEventBytes) + ) { + const oldestPath = eventCache.keys().next().value as string; + deleteCachedEvent(oldestPath); + } + + eventCache.set(eventPath, cachedEvent); + cachedEventBytes.set(eventPath, serializedBytes); + totalCachedEventBytes += serializedBytes; + const cachedPaths = + cachedPathsByRunId.get(cachedEvent.runId) ?? new Set(); + cachedPaths.add(eventPath); + cachedPathsByRunId.set(cachedEvent.runId, cachedPaths); + } + + async function storeEvent(event: Event): Promise { + const eventPath = taggedPath( + basedir, + 'events', + `${event.runId}-${event.eventId}`, + tag + ); + const serializedEvent = JSON.stringify(event, jsonReplacer, 2); + const serializedBytes = Buffer.byteLength(serializedEvent); + // Decode the serialized snapshot before the asynchronous write yields. + // This detaches caller-owned payloads and matches disk read normalization. + const cachedEvent = + serializedBytes <= maxCachedEventBytes + ? EventSchema.safeParse(JSON.parse(serializedEvent, jsonReviver)) + : undefined; + await write(eventPath, serializedEvent); + + if ( + event.eventType === 'run_completed' || + event.eventType === 'run_failed' || + event.eventType === 'run_cancelled' + ) { + clearRunCache(event.runId); + return; + } + + if (cachedEvent?.success) { + cacheEvent(eventPath, cachedEvent.data, serializedBytes); + } + } + return { + clearCache, async create(runId, data, params): Promise { // Validate request-supplied IDs before they're concatenated into // filesystem paths. This is the primary defense against path traversal @@ -324,11 +428,7 @@ export function createEventsStorage( executionContext: runInputData.executionContext, }, }; - const createdCompositeKey = `${effectiveRunId}-${runCreatedEventId}`; - await writeJSON( - taggedPath(basedir, 'events', createdCompositeKey, tag), - runCreatedEvent - ); + await storeEvent(runCreatedEvent); currentRun = createdRun; } else { // Run already exists (concurrent run_created won the @@ -404,11 +504,7 @@ export function createEventsStorage( createdAt: now, specVersion: effectiveSpecVersion, }; - const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON( - taggedPath(basedir, 'events', compositeKey, tag), - event - ); + await storeEvent(event); const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; return { @@ -962,11 +1058,7 @@ export function createEventsStorage( }; // Store the conflict event - const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON( - taggedPath(basedir, 'events', compositeKey, tag), - conflictEvent - ); + await storeEvent(conflictEvent); const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; @@ -1140,11 +1232,7 @@ export function createEventsStorage( // modify the Hook entity (which doesn't have a payload field) // Store event using composite key {runId}-{eventId} - const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON( - taggedPath(basedir, 'events', compositeKey, tag), - event - ); + await storeEvent(event); const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; const filteredEvent = stripEventDataRefs(event, resolveData); @@ -1158,6 +1246,7 @@ export function createEventsStorage( const allEvents = await paginatedFileSystemQuery({ directory: path.join(basedir, 'events'), schema: EventSchema, + cachedItems: eventCache, filePrefix: `${effectiveRunId}-`, sortOrder: 'asc', limit: 1000, @@ -1208,6 +1297,7 @@ export function createEventsStorage( const result = await paginatedFileSystemQuery({ directory: path.join(basedir, 'events'), schema: EventSchema, + cachedItems: eventCache, filePrefix: `${runId}-`, // Events in chronological order (oldest first) by default, // different from the default for other list calls. @@ -1238,6 +1328,7 @@ export function createEventsStorage( const result = await paginatedFileSystemQuery({ directory: path.join(basedir, 'events'), schema: EventSchema, + cachedItems: eventCache, // No filePrefix - search all events filter: (event) => event.correlationId === correlationId, // Events in chronological order (oldest first) by default, diff --git a/packages/world-local/src/storage/index.ts b/packages/world-local/src/storage/index.ts index 2bdcd85b3e..48b492d694 100644 --- a/packages/world-local/src/storage/index.ts +++ b/packages/world-local/src/storage/index.ts @@ -10,7 +10,10 @@ import { createStepsStorage } from './steps-storage.js'; * exposes the internal `fileIdFilter` option on `list()`. Structurally * assignable to `Storage` at public boundaries (e.g., `reenqueueActiveRuns`). */ -export type LocalStorage = Omit & { runs: LocalRunsStorage }; +export type LocalStorage = Omit & { + runs: LocalRunsStorage; + clearCache(): void; +}; /** * Creates a complete storage implementation using the filesystem. @@ -35,5 +38,6 @@ export function createStorage(basedir: string, tag?: string): LocalStorage { steps: instrumentObject('world.steps', steps), events: instrumentObject('world.events', events), hooks: instrumentObject('world.hooks', hooks), + clearCache: () => events.clearCache(), }; } diff --git a/packages/world-local/src/tag.test.ts b/packages/world-local/src/tag.test.ts index 6376ae9032..0ec9c39aff 100644 --- a/packages/world-local/src/tag.test.ts +++ b/packages/world-local/src/tag.test.ts @@ -353,6 +353,32 @@ describe('File tagging', () => { }); }); + describe('untagged clear()', () => { + it('can write new entities after clearing cached directories', async () => { + const { createLocalWorld } = await import('./index.js'); + const world = createLocalWorld({ dataDir: testDir }); + await world.start?.(); + + await createRun(world, { + deploymentId: 'dep-before-clear', + workflowName: 'before-clear', + input: new Uint8Array(), + }); + await world.clear(); + + const run = await createRun(world, { + deploymentId: 'dep-after-clear', + workflowName: 'after-clear', + input: new Uint8Array(), + }); + + expect((await world.runs.get(run.runId)).workflowName).toBe( + 'after-clear' + ); + await world.close?.(); + }); + }); + describe('full lifecycle with tags', () => { it('should support complete run lifecycle through tagged storage', async () => { const storage = createStorage(testDir, 'vitest-0'); From d175b349f4f9286e77c5005402530453b2eb5137 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 29 May 2026 15:05:15 -0700 Subject: [PATCH 2/3] Fix relative local event cache lookups --- packages/world-local/src/fs.ts | 6 +- packages/world-local/src/storage.test.ts | 81 ++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index baadc82ec5..4bc53690ad 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -531,8 +531,10 @@ export async function paginatedFileSystemQuery( assertSafeEntityId('filePrefix', filePrefix); } + const resolvedDirectory = path.resolve(directory); + // 1. Get all JSON files in directory - const fileIds = await listJSONFiles(directory); + const fileIds = await listJSONFiles(resolvedDirectory); // 2. Filter by prefix if provided const relevantFileIds = filePrefix @@ -579,7 +581,7 @@ export async function paginatedFileSystemQuery( const validItems: T[] = []; for (const fileId of candidateFileIds) { - const filePath = path.join(directory, `${fileId}.json`); + const filePath = path.join(resolvedDirectory, `${fileId}.json`); let item: T | null = null; try { const cachedItem = cachedItems?.get(filePath); diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index eb7301e879..e1c17f9476 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -151,6 +151,7 @@ describe('Storage', () => { }); afterEach(async () => { + vi.restoreAllMocks(); // Clean up test dir await fs.rm(testDir, { recursive: true, force: true }); }); @@ -1329,6 +1330,62 @@ describe('Storage', () => { expect(second.data[0]?.eventType).toBe('run_created'); }); + it('reuses sequential-step events with a relative data directory', async () => { + const relativeStorage = createStorage( + path.relative(process.cwd(), testDir) + ); + const run = await createRun(relativeStorage, { + deploymentId: 'deployment-relative-cache', + workflowName: 'relative-cache-workflow', + input: new Uint8Array([1]), + }); + await updateRun(relativeStorage, run.runId, 'run_started'); + const readFileSpy = vi.spyOn(fs, 'readFile'); + + for (let i = 0; i < 5; i++) { + const stepId = `relative_step_${i}`; + await createStep(relativeStorage, run.runId, { + stepId, + stepName: `step-${i}`, + input: new Uint8Array([i]), + }); + await updateStep(relativeStorage, run.runId, stepId, 'step_started'); + await updateStep(relativeStorage, run.runId, stepId, 'step_completed', { + result: new Uint8Array([i]), + }); + + const events = await relativeStorage.events.list({ runId: run.runId }); + expect(events.data).toHaveLength(2 + (i + 1) * 3); + } + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads).toHaveLength(0); + }); + + it('reuses locally appended events for correlation queries', async () => { + const stepId = 'cached-correlation-step'; + await createStep(storage, testRunId, { + stepId, + stepName: 'cached-correlation-step', + input: new Uint8Array([1]), + }); + await updateStep(storage, testRunId, stepId, 'step_started'); + const readFileSpy = vi.spyOn(fs, 'readFile'); + + const events = await storage.events.listByCorrelationId({ + correlationId: stepId, + pagination: {}, + }); + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(events.data).toHaveLength(2); + expect(eventFileReads).toHaveLength(0); + }); + it('reads oversized event payloads from disk instead of retaining them', async () => { const created = await storage.events.create(null, { eventType: 'run_created', @@ -1348,6 +1405,30 @@ describe('Storage', () => { expect(eventFileReads.length).toBeGreaterThan(0); }); + it('evicts old events once the recent-event entry bound is exceeded', async () => { + const hookId = 'bounded-cache-hook'; + await createHook(storage, testRunId, { + hookId, + token: 'bounded-cache-token', + }); + + for (let i = 0; i < 1001; i++) { + await storage.events.create(testRunId, { + eventType: 'hook_received', + correlationId: hookId, + eventData: { payload: new Uint8Array([i % 256]) }, + }); + } + + const readFileSpy = vi.spyOn(fs, 'readFile'); + await storage.events.list({ runId: testRunId }); + + const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) => + String(filePath).includes(`${path.sep}events${path.sep}`) + ); + expect(eventFileReads.length).toBeGreaterThan(0); + }); + it('normalizes cached event metadata the same way as disk reads', async () => { const created = await storage.events.create(null, { eventType: 'run_created', From f9c47dac978b1555d970866558bf3335f1d624a2 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 29 May 2026 15:32:20 -0700 Subject: [PATCH 3/3] Keep event cache eviction test lightweight --- packages/world-local/src/storage.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index e1c17f9476..d97bd11109 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -1405,18 +1405,18 @@ describe('Storage', () => { expect(eventFileReads.length).toBeGreaterThan(0); }); - it('evicts old events once the recent-event entry bound is exceeded', async () => { + it('evicts old events once the recent-event byte bound is exceeded', async () => { const hookId = 'bounded-cache-hook'; await createHook(storage, testRunId, { hookId, token: 'bounded-cache-token', }); - for (let i = 0; i < 1001; i++) { + for (let i = 0; i < 4; i++) { await storage.events.create(testRunId, { eventType: 'hook_received', correlationId: hookId, - eventData: { payload: new Uint8Array([i % 256]) }, + eventData: { payload: new Uint8Array(1024 * 1024) }, }); }