Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/batch/services/run-batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { BatchItem } from "../types/batch-item.js";
import type { BatchResult, BatchSummary } from "../types/batch-result.js";
import { toErrorRecord } from "./to-error-record.js";

export interface RunBatchOptions {
concurrency: number;
items: AsyncIterable<BatchItem>;
onResult: (result: BatchResult) => void | Promise<void>;
worker: (item: BatchItem) => Promise<unknown>;
}

const noop = (): undefined => undefined;

export async function runBatch({
items,
concurrency,
worker,
onResult,
}: RunBatchOptions): Promise<BatchSummary> {
const summary: BatchSummary = { total: 0, succeeded: 0, failed: 0 };
const iterator = items[Symbol.asyncIterator]();

let pullLock: Promise<unknown> = Promise.resolve();
function nextItem(): Promise<IteratorResult<BatchItem>> {
const pending = pullLock.then(() => iterator.next());
pullLock = pending.then(noop, noop);
return pending;
}

let emitLock: Promise<unknown> = Promise.resolve();
function emit(result: BatchResult): Promise<unknown> {
emitLock = emitLock.then(() => onResult(result));
return emitLock;
}

async function drain(): Promise<void> {
while (true) {
const next = await nextItem();
if (next.done) {
return;
}

const item = next.value;
summary.total++;

try {
const data = await worker(item);
summary.succeeded++;
await emit({ ...item, ok: true, data });
} catch (err) {
summary.failed++;
await emit({ ...item, ok: false, error: toErrorRecord(err) });
}
}
}

const workerCount = Math.max(1, Math.floor(concurrency));
await Promise.all(Array.from({ length: workerCount }, () => drain()));
await emitLock;

return summary;
}
12 changes: 12 additions & 0 deletions src/batch/services/to-error-record.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { BatchErrorRecord } from "../types/batch-result.js";

export function toErrorRecord(err: unknown): BatchErrorRecord {
if (err instanceof Error) {
return {
class: err.constructor.name || err.name || "Error",
message: err.message,
};
}

return { class: "Error", message: String(err) };
}
24 changes: 24 additions & 0 deletions src/batch/types/batch-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { BatchItem } from "./batch-item.js";

export interface BatchErrorRecord {
class: string;
message: string;
}

export interface BatchSuccess extends BatchItem {
data: unknown;
ok: true;
}

export interface BatchFailure extends BatchItem {
error: BatchErrorRecord;
ok: false;
}

export type BatchResult = BatchSuccess | BatchFailure;

export interface BatchSummary {
failed: number;
succeeded: number;
total: number;
}
84 changes: 84 additions & 0 deletions tests/batch/run-batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { ValidationError } from "@decodo/sdk-ts";
import { describe, expect, it } from "vitest";
import { runBatch } from "../../src/batch/services/run-batch.js";
import type { BatchItem } from "../../src/batch/types/batch-item.js";
import type { BatchResult } from "../../src/batch/types/batch-result.js";

async function* toAsync(inputs: string[]): AsyncGenerator<BatchItem> {
let index = 0;
for (const input of inputs) {
await Promise.resolve();
yield { index, input };
index++;
}
}

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

describe("runBatch", () => {
it("runs every item and emits success records", async () => {
const results: BatchResult[] = [];

const summary = await runBatch({
items: toAsync(["a", "b", "c"]),
concurrency: 2,
worker: (item) => Promise.resolve(item.input.toUpperCase()),
onResult: (result) => {
results.push(result);
},
});

expect(summary).toEqual({ total: 3, succeeded: 3, failed: 0 });
expect(results.map((r) => r.index).sort()).toEqual([0, 1, 2]);
const a = results.find((r) => r.input === "a");
expect(a).toMatchObject({ ok: true, data: "A" });
});

it("captures per-item errors without aborting the batch", async () => {
const results: BatchResult[] = [];

const summary = await runBatch({
items: toAsync(["ok-1", "boom", "ok-2"]),
concurrency: 3,
worker: (item) => {
if (item.input === "boom") {
throw new ValidationError("bad input");
}
return Promise.resolve(item.input);
},
onResult: (result) => {
results.push(result);
},
});

expect(summary).toEqual({ total: 3, succeeded: 2, failed: 1 });
const failure = results.find((r) => !r.ok);
expect(failure).toMatchObject({
ok: false,
input: "boom",
error: { class: "ValidationError", message: "bad input" },
});
});

it("never exceeds the configured concurrency", async () => {
let active = 0;
let peak = 0;

await runBatch({
items: toAsync(["1", "2", "3", "4", "5", "6"]),
concurrency: 2,
worker: async () => {
active++;
peak = Math.max(peak, active);
await delay(5);
active--;
},
onResult: () => undefined,
});

expect(peak).toBeLessThanOrEqual(2);
expect(peak).toBeGreaterThan(0);
});
});
Loading