diff --git a/.changeset/hungry-bees-call.md b/.changeset/hungry-bees-call.md new file mode 100644 index 000000000..9db59de19 --- /dev/null +++ b/.changeset/hungry-bees-call.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': patch +--- + +make sure that notify are properly awaited diff --git a/.changeset/strict-windows-think.md b/.changeset/strict-windows-think.md new file mode 100644 index 000000000..4d42790cf --- /dev/null +++ b/.changeset/strict-windows-think.md @@ -0,0 +1,5 @@ +--- +'@openfn/engine-multi': patch +--- + +Make publish() async diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index a388faf0f..483753ee1 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -81,7 +81,7 @@ register({ notify: (name: NotifyEvents, payload: any) => { // @ts-ignore const mappedPayload = eventMap[name]?.(payload) ?? payload; - publish(`worker:${name}`, { + return publish(`worker:${name}`, { workflowId: plan.id, ...mappedPayload, }); diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index 819c35ade..a9d1e26e8 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -147,7 +147,7 @@ const executeStep = async ( const jobName = job.name || job.id; // The notify events only apply to jobs - not steps - so names don't need to be changed here - notify(NOTIFY_INIT_START, { jobId }); + await notify(NOTIFY_INIT_START, { jobId }); // lazy load config and state const configuration = await loadCredentials( @@ -166,7 +166,7 @@ const executeStep = async ( plan.workflow?.credentials ); - notify(NOTIFY_INIT_COMPLETE, { + await notify(NOTIFY_INIT_COMPLETE, { jobId, duration: Date.now() - duration, }); @@ -182,7 +182,7 @@ const executeStep = async ( const startTime = Date.now(); try { // TODO include the upstream job? - notify(NOTIFY_JOB_START, { jobId }); + await notify(NOTIFY_JOB_START, { jobId }); result = await executeExpression( ctx, job.expression!, @@ -212,7 +212,7 @@ const executeStep = async ( report(state, jobId, error); next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_ERROR, { + await notify(NOTIFY_JOB_ERROR, { duration: Date.now() - startTime, error, state, @@ -267,7 +267,7 @@ const executeStep = async ( } next = calculateNext(step, result, logger); - notify(NOTIFY_JOB_COMPLETE, { + await notify(NOTIFY_JOB_COMPLETE, { duration: Date.now() - duration, state: result, jobId,