From f6eb80141c001402ce41033ba49ec5eb6c167bb3 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 23 Apr 2026 14:57:50 +0100 Subject: [PATCH 1/5] create a horrible test that reproduces the issue! --- .../ws-worker/test/api/process-event.test.ts | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index d554063f4..8f7203eff 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -378,6 +378,103 @@ test('should send a batch on interrupt with an async queue', async (t) => { }); }); +// debugging +/** + * this test sort of works now + * + * it send 100 events (maybe more) and expects them all to be received + * But it fails because sometimes events are dropped + * sometimes it just doesn't exit either + * + * I'd like to tidy this somehow + */ +test.only('!!!', async (t) => { + t.timeout(1000 * 30) + return new Promise((resolve) => { + let count = 0; + let didStart; + let done = false; + const events = []; + + const maybeFinish = (evt) => { + if (done && evt.id === count) { + console.log('>>> finishing', evt.id) + console.log(count, events.length) + console.log(events) + + let prevId= 0; + for(const e of events) { + if (e.id == prevId + 1) { + prevId++ + } else { + console.log('fail on ', e) + + t.fail() + resolve() + break; + } + } + resolve() + } + } + + const callbacks = createCallbacks({ + start: (c, e) => { + events.push(e) + console.log('start') + maybeFinish(e) + }, + end: (c, e) => { + events.push(e) + console.log('end') + maybeFinish(e) + }, + test: (_context: any, evt: any) => { + events.push(...evt) + console.log('test', evt.length) + console.log(evt.at(-1)) + maybeFinish(evt.pop()) // send the last one + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test', 'start', 'end'], + batch: { test: true }, + batchLimit: 6, + batchInterval: 100, + }); + + // randomly send events at high volume + // every start should be paired with an end + while(count < 100 || !didStart) { + count++; + + if (Math.random() < 0.8) { + engine.emit('test', {id: count }) + } else { + if (didStart) { + engine.emit('end', {id: count }) + didStart = false; + } else { + engine.emit('start', {id: count }) + didStart = true; + } + } + } + done = true; + + // // TODO assert event order is correct + // t.pass() + // resolve() + + }); +}); + test('should continue processing if a callback throws', async (t) => { const result: number[] = []; const callbacks = createCallbacks({ From f48848e9a4cf17b5d5efbf99f4ee70aebd6571ff Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 23 Apr 2026 15:27:37 +0100 Subject: [PATCH 2/5] nicer tests --- .../ws-worker/test/api/process-event.test.ts | 113 +++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 8f7203eff..436d0e375 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -378,6 +378,117 @@ test('should send a batch on interrupt with an async queue', async (t) => { }); }); +test('should not drop an event', async (t) => { + // const total = 1e5; // this works! + // t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + + t.plan(total) + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt: any) => { + events.push(evt) + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (_c: any, e: any) => { + handler(e) + }, + b: (_c: any, e: any) => { + handler(e) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.6) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + +test.only('should not drop an event in batch mode', async (t) => { + t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt) => { + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (c, e) => { + events.push(e) + handler(e) + }, + b: (_context: any, evt: any) => { + events.push(...evt) + handler(evt.pop()) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batch: { b: true }, + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.8) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + // debugging /** * this test sort of works now @@ -388,7 +499,7 @@ test('should send a batch on interrupt with an async queue', async (t) => { * * I'd like to tidy this somehow */ -test.only('!!!', async (t) => { +test.skip('!!!', async (t) => { t.timeout(1000 * 30) return new Promise((resolve) => { let count = 0; From e82854efaca9ddecb8e0bd4fc6ba9bebbc0de768 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 23 Apr 2026 15:57:22 +0100 Subject: [PATCH 3/5] possible fix --- packages/ws-worker/src/api/process-events.ts | 10 +++++----- packages/ws-worker/test/api/process-event.test.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 20718c32d..e84c0ca1a 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -183,12 +183,12 @@ export function eventProcessor( } }; - const addToBatch = async (event: any) => { + const addToBatch = async (event: any, triggerNext = false) => { batch.push(event); if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(triggerNext); } }; @@ -225,13 +225,13 @@ export function eventProcessor( if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(false); } } else { // If there's another pending item not a part of this batch, // just send the batch now // send the batch early - return sendBatch(true); + return sendBatch(false); } } @@ -265,7 +265,7 @@ export function eventProcessor( trace(`[${name}] executing immediately`); setImmediate(next); } else if (activeBatch === name) { - addToBatch(event); + addToBatch(event, true); queue.pop(); } else if (queue.length == 2 && batchTimeout) { trace('Sending batch early'); diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 436d0e375..22fdcd7aa 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -434,7 +434,7 @@ test('should not drop an event', async (t) => { }); }); -test.only('should not drop an event in batch mode', async (t) => { +test('should not drop an event in batch mode', async (t) => { t.timeout(1000 * 30) const total = 100; // this should suffice for unit tests From 8695e19128e419ccd379f2db70143caf275e57c8 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 23 Apr 2026 16:22:42 +0100 Subject: [PATCH 4/5] types and remove old test --- .../ws-worker/test/api/process-event.test.ts | 101 +----------------- 1 file changed, 2 insertions(+), 99 deletions(-) diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 22fdcd7aa..b7199caad 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -441,7 +441,7 @@ test('should not drop an event in batch mode', async (t) => { return new Promise((resolve) => { const events: any[] = []; - const handler = (evt) => { + const handler = (evt: any) => { if (evt.id === total) { let prevId= 0; for(const e of events) { @@ -453,7 +453,7 @@ test('should not drop an event in batch mode', async (t) => { } const callbacks = createCallbacks({ - a: (c, e) => { + a: (_c: any, e: any) => { events.push(e) handler(e) }, @@ -489,103 +489,6 @@ test('should not drop an event in batch mode', async (t) => { }); }); -// debugging -/** - * this test sort of works now - * - * it send 100 events (maybe more) and expects them all to be received - * But it fails because sometimes events are dropped - * sometimes it just doesn't exit either - * - * I'd like to tidy this somehow - */ -test.skip('!!!', async (t) => { - t.timeout(1000 * 30) - return new Promise((resolve) => { - let count = 0; - let didStart; - let done = false; - const events = []; - - const maybeFinish = (evt) => { - if (done && evt.id === count) { - console.log('>>> finishing', evt.id) - console.log(count, events.length) - console.log(events) - - let prevId= 0; - for(const e of events) { - if (e.id == prevId + 1) { - prevId++ - } else { - console.log('fail on ', e) - - t.fail() - resolve() - break; - } - } - resolve() - } - } - - const callbacks = createCallbacks({ - start: (c, e) => { - events.push(e) - console.log('start') - maybeFinish(e) - }, - end: (c, e) => { - events.push(e) - console.log('end') - maybeFinish(e) - }, - test: (_context: any, evt: any) => { - events.push(...evt) - console.log('test', evt.length) - console.log(evt.at(-1)) - maybeFinish(evt.pop()) // send the last one - }, - }); - - const engine = createFakeEngine(); - const context: any = { - logger, - }; - - eventProcessor(engine, context, callbacks, { - events: ['test', 'start', 'end'], - batch: { test: true }, - batchLimit: 6, - batchInterval: 100, - }); - - // randomly send events at high volume - // every start should be paired with an end - while(count < 100 || !didStart) { - count++; - - if (Math.random() < 0.8) { - engine.emit('test', {id: count }) - } else { - if (didStart) { - engine.emit('end', {id: count }) - didStart = false; - } else { - engine.emit('start', {id: count }) - didStart = true; - } - } - } - done = true; - - // // TODO assert event order is correct - // t.pass() - // resolve() - - }); -}); - test('should continue processing if a callback throws', async (t) => { const result: number[] = []; const callbacks = createCallbacks({ From 7f93ddb49320f24e3368b785be9009c54c74191c Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 24 Apr 2026 12:04:12 +0100 Subject: [PATCH 5/5] changeset --- .changeset/stale-lights-occur.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/stale-lights-occur.md diff --git a/.changeset/stale-lights-occur.md b/.changeset/stale-lights-occur.md new file mode 100644 index 000000000..24cee7694 --- /dev/null +++ b/.changeset/stale-lights-occur.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Fix an issue in batch events