From 944ed19097af17fca7ad5fc324f6f10ec131f4a8 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 24 Apr 2026 11:34:58 +0100 Subject: [PATCH 1/2] runtime: ensure notify calls are awaited We added an async await to ensurePayloadSize a little while ago, but the runtime doens't actually wait for this. It's plausble that while waiting for a payload to be calculated, a step could finish and trigger the complete event early --- .changeset/hungry-bees-call.md | 5 +++++ packages/runtime/src/execute/step.ts | 10 +++++----- 2 files changed, 10 insertions(+), 5 deletions(-) create mode 100644 .changeset/hungry-bees-call.md diff --git a/.changeset/hungry-bees-call.md b/.changeset/hungry-bees-call.md new file mode 100644 index 000000000..9db59de19 --- /dev/null +++ b/.changeset/hungry-bees-call.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': patch +--- + +make sure that notify are properly awaited diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 819c35ade..a9d1e26e8 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -147,7 +147,7 @@ const executeStep = async ( const jobName = job.name || job.id; // The notify events only apply to jobs - not steps - so names don't need to be changed here - notify(NOTIFY_INIT_START, { jobId }); + await notify(NOTIFY_INIT_START, { jobId }); // lazy load config and state const configuration = await loadCredentials( @@ -166,7 +166,7 @@ const executeStep = async ( plan.workflow?.credentials ); - notify(NOTIFY_INIT_COMPLETE, { + await notify(NOTIFY_INIT_COMPLETE, { jobId, duration: Date.now() - duration, }); @@ -182,7 +182,7 @@ const executeStep = async ( const startTime = Date.now(); try { // TODO include the upstream job? - notify(NOTIFY_JOB_START, { jobId }); + await notify(NOTIFY_JOB_START, { jobId }); result = await executeExpression( ctx, job.expression!, @@ -212,7 +212,7 @@ const executeStep = async ( report(state, jobId, error); next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_ERROR, { + await notify(NOTIFY_JOB_ERROR, { duration: Date.now() - startTime, error, state, @@ -267,7 +267,7 @@ const executeStep = async ( } next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_COMPLETE, { + await notify(NOTIFY_JOB_COMPLETE, { duration: Date.now() - duration, state: result, jobId, From 8d2954d4e79490fba140e7c51d3ea1dba65c0a89 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 24 Apr 2026 11:35:53 +0100 Subject: [PATCH 2/2] engine: return result of publish --- .changeset/strict-windows-think.md | 5 +++++ packages/engine-multi/src/worker/thread/run.ts | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 .changeset/strict-windows-think.md diff --git a/.changeset/strict-windows-think.md b/.changeset/strict-windows-think.md new file mode 100644 index 000000000..4d42790cf --- /dev/null +++ b/.changeset/strict-windows-think.md @@ -0,0 +1,5 @@ +--- +'@openfn/engine-multi': patch +--- + +Make publish() async diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index a388faf0f..483753ee1 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -81,7 +81,7 @@ register({ notify: (name: NotifyEvents, payload: any) => { // @ts-ignore const mappedPayload = eventMap[name]?.(payload) ?? payload; - publish(`worker:${name}`, { + return publish(`worker:${name}`, { workflowId: plan.id, ...mappedPayload, });