diff --git a/.changeset/stale-lights-occur.md b/.changeset/stale-lights-occur.md new file mode 100644 index 000000000..24cee7694 --- /dev/null +++ b/.changeset/stale-lights-occur.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Fix an issue in batch events diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 20718c32d..e84c0ca1a 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -183,12 +183,12 @@ export function eventProcessor( } }; - const addToBatch = async (event: any) => { + const addToBatch = async (event: any, triggerNext = false) => { batch.push(event); if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(triggerNext); } }; @@ -225,13 +225,13 @@ export function eventProcessor( if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(false); } } else { // If there's another pending item not a part of this batch, // just send the batch now // send the batch early - return sendBatch(true); + return sendBatch(false); } } @@ -265,7 +265,7 @@ export function eventProcessor( trace(`[${name}] executing immediately`); setImmediate(next); } else if (activeBatch === name) { - addToBatch(event); + addToBatch(event, true); queue.pop(); } else if (queue.length == 2 && batchTimeout) { trace('Sending batch early'); diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index d554063f4..b7199caad 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -378,6 +378,117 @@ test('should send a batch on interrupt with an async queue', async (t) => { }); }); +test('should not drop an event', async (t) => { + // const total = 1e5; // this works! + // t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + + t.plan(total) + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt: any) => { + events.push(evt) + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (_c: any, e: any) => { + handler(e) + }, + b: (_c: any, e: any) => { + handler(e) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.6) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + +test('should not drop an event in batch mode', async (t) => { + t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt: any) => { + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (_c: any, e: any) => { + events.push(e) + handler(e) + }, + b: (_context: any, evt: any) => { + events.push(...evt) + handler(evt.pop()) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batch: { b: true }, + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.8) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + test('should continue processing if a callback throws', async (t) => { const result: number[] = []; const callbacks = createCallbacks({