diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 9bcba0386..72a7275d7 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/cli +## 1.35.1 + +### Patch Changes + +- Updated dependencies [944ed19] + - @openfn/runtime@1.9.2 + ## 1.35.0 ### Minor Changes @@ -17,7 +24,6 @@ - 3918358: Override config.endpoint with one from openfn.yaml - ## 1.34.1 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 9ad265111..56d359232 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.35.0", + "version": "1.35.1", "description": "CLI devtools for the OpenFn toolchain", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index ec951f209..7cdd7b4c2 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,14 @@ # engine-multi +## 1.11.3 + +### Patch Changes + +- 4a25473: Add debugging to for step start/complete +- 8d2954d: Make publish() async +- Updated dependencies [944ed19] + - @openfn/runtime@1.9.2 + ## 1.11.2 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 9e0ee8af5..228d90823 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.11.2", + "version": "1.11.3", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index 3de02cfcd..817140899 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -14,7 +14,6 @@ export const workflowStart = ( ) => { const { state, logger } = context; const { workflowId, threadId } = event; - logger.info('starting workflow ', workflowId); // where would this throw get caught? @@ -68,7 +67,9 @@ export const jobStart = ( context: ExecutionContext, event: internalEvents.JobStartEvent ) => { + const { logger, state } = context; const { threadId, jobId } = event; + logger.debug(`${state.id}: sending job start (step start): ${event.jobId}`); context.emit(externalEvents.JOB_START, { jobId, @@ -81,7 +82,11 @@ export const jobComplete = ( context: ExecutionContext, event: internalEvents.JobCompleteEvent ) => { + const { logger, state: runState } = context; const { threadId, state, duration, jobId, next, mem, redacted } = event; + logger.debug( + `${runState.id}: sending job complete (step complete): ${event.jobId}` + ); context.emit(externalEvents.JOB_COMPLETE, { threadId, diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index a388faf0f..483753ee1 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -81,7 +81,7 @@ register({ notify: (name: NotifyEvents, payload: any) => { // @ts-ignore const mappedPayload = eventMap[name]?.(payload) ?? payload; - publish(`worker:${name}`, { + return publish(`worker:${name}`, { workflowId: plan.id, ...mappedPayload, }); diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index ae53dab7d..a045e37d4 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/lightning-mock +## 2.4.15 + +### Patch Changes + +- Updated dependencies [944ed19] +- Updated dependencies [4a25473] +- Updated dependencies [8d2954d] + - @openfn/runtime@1.9.2 + - @openfn/engine-multi@1.11.3 + ## 2.4.14 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index b50b91962..744725e68 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.14", + "version": "2.4.15", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 0dba2afd4..fd1106b96 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.9.2 + +### Patch Changes + +- 944ed19: make sure that notify are properly awaited + ## 1.9.1 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 3568d958a..4cd8172f5 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.9.1", + "version": "1.9.2", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 819c35ade..a9d1e26e8 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -147,7 +147,7 @@ const executeStep = async ( const jobName = job.name || job.id; // The notify events only apply to jobs - not steps - so names don't need to be changed here - notify(NOTIFY_INIT_START, { jobId }); + await notify(NOTIFY_INIT_START, { jobId }); // lazy load config and state const configuration = await loadCredentials( @@ -166,7 +166,7 @@ const executeStep = async ( plan.workflow?.credentials ); - notify(NOTIFY_INIT_COMPLETE, { + await notify(NOTIFY_INIT_COMPLETE, { jobId, duration: Date.now() - duration, }); @@ -182,7 +182,7 @@ const executeStep = async ( const startTime = Date.now(); try { // TODO include the upstream job? - notify(NOTIFY_JOB_START, { jobId }); + await notify(NOTIFY_JOB_START, { jobId }); result = await executeExpression( ctx, job.expression!, @@ -212,7 +212,7 @@ const executeStep = async ( report(state, jobId, error); next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_ERROR, { + await notify(NOTIFY_JOB_ERROR, { duration: Date.now() - startTime, error, state, @@ -267,7 +267,7 @@ const executeStep = async ( } next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_COMPLETE, { + await notify(NOTIFY_JOB_COMPLETE, { duration: Date.now() - duration, state: result, jobId, diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 8fef29b2e..acaf667d6 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,16 @@ # ws-worker +## 1.24.1 + +### Patch Changes + +- 7f93ddb: Fix an issue in batch events +- Updated dependencies [944ed19] +- Updated dependencies [4a25473] +- Updated dependencies [8d2954d] + - @openfn/runtime@1.9.2 + - @openfn/engine-multi@1.11.3 + ## 1.24.0 ### Minor Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index a70221a16..18ce60881 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.24.0", + "version": "1.24.1", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 20718c32d..e84c0ca1a 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -183,12 +183,12 @@ export function eventProcessor( } }; - const addToBatch = async (event: any) => { + const addToBatch = async (event: any, triggerNext = false) => { batch.push(event); if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(triggerNext); } }; @@ -225,13 +225,13 @@ export function eventProcessor( if (batch.length >= batchLimit) { // If we're at the batch limit, return right away - return sendBatch(true); + return sendBatch(false); } } else { // If there's another pending item not a part of this batch, // just send the batch now // send the batch early - return sendBatch(true); + return sendBatch(false); } } @@ -265,7 +265,7 @@ export function eventProcessor( trace(`[${name}] executing immediately`); setImmediate(next); } else if (activeBatch === name) { - addToBatch(event); + addToBatch(event, true); queue.pop(); } else if (queue.length == 2 && batchTimeout) { trace('Sending batch early'); diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index d554063f4..b7199caad 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -378,6 +378,117 @@ test('should send a batch on interrupt with an async queue', async (t) => { }); }); +test('should not drop an event', async (t) => { + // const total = 1e5; // this works! + // t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + + t.plan(total) + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt: any) => { + events.push(evt) + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (_c: any, e: any) => { + handler(e) + }, + b: (_c: any, e: any) => { + handler(e) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.6) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + +test('should not drop an event in batch mode', async (t) => { + t.timeout(1000 * 30) + + const total = 100; // this should suffice for unit tests + return new Promise((resolve) => { + const events: any[] = []; + + const handler = (evt: any) => { + if (evt.id === total) { + let prevId= 0; + for(const e of events) { + t.is(e.id, prevId + 1) + prevId++ + } + resolve() + } + } + + const callbacks = createCallbacks({ + a: (_c: any, e: any) => { + events.push(e) + handler(e) + }, + b: (_context: any, evt: any) => { + events.push(...evt) + handler(evt.pop()) + }, + }); + + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['a', 'b'], + batch: { b: true }, + batchLimit: 6, + batchInterval: 100, + }); + + let count = 0; + // randomly send events at high volume + while(count < total) { + count++; + + if (Math.random() < 0.8) { + engine.emit('b', {id: count }) + } else { + engine.emit('a', {id: count }) + } + } + }); +}); + test('should continue processing if a callback throws', async (t) => { const result: number[] = []; const callbacks = createCallbacks({