Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-local-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Reduce local sequential-step replay I/O with bounded recent-event and storage-directory caches.
44 changes: 44 additions & 0 deletions packages/world-local/src/fs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import {
import { z } from 'zod';
import {
assertSafeEntityId,
clearCreatedFilesCache,
ensureDir,
paginatedFileSystemQuery,
readFirstByte,
readJSONWithFallback,
resolveWithinBase,
taggedPath,
UnsafeEntityIdError,
ulidToDate,
writeExclusive,
writeJSON,
} from './fs.js';

Expand Down Expand Up @@ -113,6 +116,47 @@ describe('fs utilities', () => {
});
});

describe('ensureDir', () => {
it('does not repeat mkdir for a directory created by this process', async () => {
clearCreatedFilesCache();
const nestedDir = path.join(testDir, 'events');
const mkdirSpy = vi.spyOn(fs, 'mkdir');

await ensureDir(nestedDir);
await ensureDir(nestedDir);

expect(
mkdirSpy.mock.calls.filter(([dirPath]) => dirPath === nestedDir)
).toHaveLength(1);
});

it('recreates a cached directory removed before an atomic write', async () => {
clearCreatedFilesCache();
const eventsDir = path.join(testDir, 'events');
const firstPath = path.join(eventsDir, 'first.json');
const secondPath = path.join(eventsDir, 'second.json');

await writeJSON(firstPath, { value: 'first' });
await fs.rm(eventsDir, { recursive: true, force: true });
await writeJSON(secondPath, { value: 'second' });

expect(JSON.parse(await fs.readFile(secondPath, 'utf8'))).toEqual({
value: 'second',
});
});

it('recreates a cached directory removed before an exclusive write', async () => {
clearCreatedFilesCache();
const locksDir = path.join(testDir, '.locks');

expect(await writeExclusive(path.join(locksDir, 'first'), '')).toBe(true);
await fs.rm(locksDir, { recursive: true, force: true });
expect(await writeExclusive(path.join(locksDir, 'second'), '')).toBe(
true
);
});
});

describe('paginatedFileSystemQuery', () => {
// Simple getCreatedAt function that strips .json and tries to parse as ULID
const getCreatedAt = (filename: string): Date | null => {
Expand Down
68 changes: 56 additions & 12 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ async function withWindowsRetry<T>(
// In-memory cache of created files to avoid expensive fs.access() calls
// This is safe because we only write once per file path (no overwrites without explicit flag)
const createdFilesCache = new Set<string>();
// Writes repeatedly target a small fixed set of entity directories. Once one
// exists in this process, avoid another recursive mkdir syscall per event.
const createdDirectoriesCache = new Set<string>();

/**
* Clear the created files cache. Useful for testing or when files are deleted externally.
* Clear write-path caches. Useful for testing or when files are deleted externally.
*/
export function clearCreatedFilesCache(): void {
createdFilesCache.clear();
createdDirectoriesCache.clear();
}

export { ulidToDate } from '@workflow/world';
Expand Down Expand Up @@ -252,13 +256,38 @@ export async function listTaggedFilesByExtension(
}

export async function ensureDir(dirPath: string): Promise<void> {
const resolvedPath = path.resolve(dirPath);
if (createdDirectoriesCache.has(resolvedPath)) {
return;
}
try {
await fs.mkdir(dirPath, { recursive: true });
await fs.mkdir(resolvedPath, { recursive: true });
createdDirectoriesCache.add(resolvedPath);
} catch (_error) {
// Ignore if already exists
}
}

async function withEnsuredDirectory<T>(
dirPath: string,
operation: () => Promise<T>
): Promise<T> {
await ensureDir(dirPath);
try {
return await operation();
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}

// A dev server may outlive an external cleanup of its data directory.
// Forget the cached directory and retry once after recreating it.
createdDirectoriesCache.delete(path.resolve(dirPath));
await ensureDir(dirPath);
return operation();
}
}

interface WriteOptions {
overwrite?: boolean;
}
Expand Down Expand Up @@ -342,10 +371,11 @@ export async function write(
const tempPath = `${filePath}.tmp.${ulid()}`;
let tempFileCreated = false;
try {
await ensureDir(path.dirname(filePath));
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
});
// Track this file in cache so future writes know it exists
createdFilesCache.add(filePath);
} catch (error) {
Expand Down Expand Up @@ -405,10 +435,11 @@ export async function writeExclusive(
filePath: string,
data: string
): Promise<boolean> {
await ensureDir(path.dirname(filePath));
try {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
return await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
});
} catch (error: any) {
if (error.code === 'EEXIST') {
return false;
Expand Down Expand Up @@ -439,6 +470,12 @@ export async function listFilesByExtension(
interface PaginatedFileSystemQueryConfig<T> {
directory: string;
schema: z.ZodType<T>;
/**
* Optional immutable-item cache, keyed by absolute file path. Event files
* are append-only, so world-local can replay events it just persisted
* without rereading and reparsing them from disk.
*/
cachedItems?: ReadonlyMap<string, T>;
filePrefix?: string;
fileIdFilter?: (fileId: string) => boolean;
filter?: (item: T) => boolean;
Expand Down Expand Up @@ -474,6 +511,7 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const {
directory,
schema,
cachedItems,
filePrefix,
fileIdFilter,
filter,
Expand All @@ -493,8 +531,10 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
assertSafeEntityId('filePrefix', filePrefix);
}

const resolvedDirectory = path.resolve(directory);

// 1. Get all JSON files in directory
const fileIds = await listJSONFiles(directory);
const fileIds = await listJSONFiles(resolvedDirectory);

// 2. Filter by prefix if provided
const relevantFileIds = filePrefix
Expand Down Expand Up @@ -541,10 +581,14 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const validItems: T[] = [];

for (const fileId of candidateFileIds) {
const filePath = path.join(directory, `${fileId}.json`);
const filePath = path.join(resolvedDirectory, `${fileId}.json`);
let item: T | null = null;
try {
item = await readJSON(filePath, schema);
const cachedItem = cachedItems?.get(filePath);
Comment thread
pranaygp marked this conversation as resolved.
item =
cachedItem === undefined
? await readJSON(filePath, schema)
: structuredClone(cachedItem);
} catch (error: unknown) {
// We don't expect zod errors to happen, but if the JSON does get malformed,
// we skip the item. Preferably, we'd have a way to mark items as malformed,
Expand Down
9 changes: 8 additions & 1 deletion packages/world-local/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
const mergedConfig = { ...config.value, ...definedArgs };
const tag = mergedConfig.tag;
const queue = createQueue(mergedConfig);
const storage = createStorage(mergedConfig.dataDir, tag);
const { clearCache: clearStorageCache, ...storage } = createStorage(
mergedConfig.dataDir,
tag
);
const recoverActiveRuns = mergedConfig.recoverActiveRuns ?? true;
return {
specVersion: SPEC_VERSION_CURRENT,
Expand Down Expand Up @@ -94,9 +97,11 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
await reenqueueActiveRuns(recoveryRuns, queue.queue, 'world-local');
},
async close() {
clearStorageCache();
await queue.close();
},
async clear() {
clearStorageCache();
if (tag) {
// Selectively delete only files matching this tag
const basedir = mergedConfig.dataDir;
Expand Down Expand Up @@ -159,6 +164,8 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
// Clear the in-memory write cache so deleted paths are forgotten
clearCreatedFilesCache();
} else {
// `rm()` removes directories that the write path may have cached.
clearCreatedFilesCache();
await rm(mergedConfig.dataDir, { recursive: true, force: true });
await initDataDir(mergedConfig.dataDir);
}
Expand Down
Loading
Loading