Skip to content
Merged
8 changes: 7 additions & 1 deletion packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 1.35.1

### Patch Changes

- Updated dependencies [944ed19]
- @openfn/runtime@1.9.2

## 1.35.0

### Minor Changes
Expand All @@ -17,7 +24,6 @@

- 3918358: Override config.endpoint with one from openfn.yaml


## 1.34.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.35.0",
"version": "1.35.1",
"description": "CLI devtools for the OpenFn toolchain",
"engines": {
"node": ">=18",
Expand Down
9 changes: 9 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# engine-multi

## 1.11.3

### Patch Changes

- 4a25473: Add debugging to for step start/complete
- 8d2954d: Make publish() async
- Updated dependencies [944ed19]
- @openfn/runtime@1.9.2

## 1.11.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.11.2",
"version": "1.11.3",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
7 changes: 6 additions & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export const workflowStart = (
) => {
const { state, logger } = context;
const { workflowId, threadId } = event;

logger.info('starting workflow ', workflowId);

// where would this throw get caught?
Expand Down Expand Up @@ -68,7 +67,9 @@ export const jobStart = (
context: ExecutionContext,
event: internalEvents.JobStartEvent
) => {
const { logger, state } = context;
const { threadId, jobId } = event;
logger.debug(`${state.id}: sending job start (step start): ${event.jobId}`);

context.emit(externalEvents.JOB_START, {
jobId,
Expand All @@ -81,7 +82,11 @@ export const jobComplete = (
context: ExecutionContext,
event: internalEvents.JobCompleteEvent
) => {
const { logger, state: runState } = context;
const { threadId, state, duration, jobId, next, mem, redacted } = event;
logger.debug(
`${runState.id}: sending job complete (step complete): ${event.jobId}`
);

context.emit(externalEvents.JOB_COMPLETE, {
threadId,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ register({
notify: (name: NotifyEvents, payload: any) => {
// @ts-ignore
const mappedPayload = eventMap[name]?.(payload) ?? payload;
publish(`worker:${name}`, {
return publish(`worker:${name}`, {
workflowId: plan.id,
...mappedPayload,
});
Expand Down
10 changes: 10 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/lightning-mock

## 2.4.15

### Patch Changes

- Updated dependencies [944ed19]
- Updated dependencies [4a25473]
- Updated dependencies [8d2954d]
- @openfn/runtime@1.9.2
- @openfn/engine-multi@1.11.3

## 2.4.14

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.4.14",
"version": "2.4.15",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/runtime

## 1.9.2

### Patch Changes

- 944ed19: make sure that notify are properly awaited

## 1.9.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/runtime",
"version": "1.9.1",
"version": "1.9.2",
"description": "Job processing runtime.",
"type": "module",
"exports": {
Expand Down
10 changes: 5 additions & 5 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ const executeStep = async (
const jobName = job.name || job.id;

// The notify events only apply to jobs - not steps - so names don't need to be changed here
notify(NOTIFY_INIT_START, { jobId });
await notify(NOTIFY_INIT_START, { jobId });

// lazy load config and state
const configuration = await loadCredentials(
Expand All @@ -166,7 +166,7 @@ const executeStep = async (
plan.workflow?.credentials
);

notify(NOTIFY_INIT_COMPLETE, {
await notify(NOTIFY_INIT_COMPLETE, {
jobId,
duration: Date.now() - duration,
});
Expand All @@ -182,7 +182,7 @@ const executeStep = async (
const startTime = Date.now();
try {
// TODO include the upstream job?
notify(NOTIFY_JOB_START, { jobId });
await notify(NOTIFY_JOB_START, { jobId });
result = await executeExpression(
ctx,
job.expression!,
Expand Down Expand Up @@ -212,7 +212,7 @@ const executeStep = async (
report(state, jobId, error);

next = calculateNext(step, result, logger);
notify(NOTIFY_JOB_ERROR, {
await notify(NOTIFY_JOB_ERROR, {
duration: Date.now() - startTime,
error,
state,
Expand Down Expand Up @@ -267,7 +267,7 @@ const executeStep = async (
}

next = calculateNext(step, result, logger);
notify(NOTIFY_JOB_COMPLETE, {
await notify(NOTIFY_JOB_COMPLETE, {
duration: Date.now() - duration,
state: result,
jobId,
Expand Down
11 changes: 11 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# ws-worker

## 1.24.1

### Patch Changes

- 7f93ddb: Fix an issue in batch events
- Updated dependencies [944ed19]
- Updated dependencies [4a25473]
- Updated dependencies [8d2954d]
- @openfn/runtime@1.9.2
- @openfn/engine-multi@1.11.3

## 1.24.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.24.0",
"version": "1.24.1",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
10 changes: 5 additions & 5 deletions packages/ws-worker/src/api/process-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ export function eventProcessor(
}
};

const addToBatch = async (event: any) => {
const addToBatch = async (event: any, triggerNext = false) => {
batch.push(event);

if (batch.length >= batchLimit) {
// If we're at the batch limit, return right away
return sendBatch(true);
return sendBatch(triggerNext);
}
};

Expand Down Expand Up @@ -225,13 +225,13 @@ export function eventProcessor(

if (batch.length >= batchLimit) {
// If we're at the batch limit, return right away
return sendBatch(true);
return sendBatch(false);
}
} else {
// If there's another pending item not a part of this batch,
// just send the batch now
// send the batch early
return sendBatch(true);
return sendBatch(false);
}
}

Expand Down Expand Up @@ -265,7 +265,7 @@ export function eventProcessor(
trace(`[${name}] executing immediately`);
setImmediate(next);
} else if (activeBatch === name) {
addToBatch(event);
addToBatch(event, true);
queue.pop();
} else if (queue.length == 2 && batchTimeout) {
trace('Sending batch early');
Expand Down
111 changes: 111 additions & 0 deletions packages/ws-worker/test/api/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,117 @@ test('should send a batch on interrupt with an async queue', async (t) => {
});
});

test('should not drop an event', async (t) => {
// const total = 1e5; // this works!
// t.timeout(1000 * 30)

const total = 100; // this should suffice for unit tests

t.plan(total)
return new Promise((resolve) => {
const events: any[] = [];

const handler = (evt: any) => {
events.push(evt)
if (evt.id === total) {
let prevId= 0;
for(const e of events) {
t.is(e.id, prevId + 1)
prevId++
}
resolve()
}
}

const callbacks = createCallbacks({
a: (_c: any, e: any) => {
handler(e)
},
b: (_c: any, e: any) => {
handler(e)
},
});

const engine = createFakeEngine();
const context: any = {
logger,
};

eventProcessor(engine, context, callbacks, {
events: ['a', 'b'],
batchLimit: 6,
batchInterval: 100,
});

let count = 0;
// randomly send events at high volume
while(count < total) {
count++;

if (Math.random() < 0.6) {
engine.emit('b', {id: count })
} else {
engine.emit('a', {id: count })
}
}
});
});

test('should not drop an event in batch mode', async (t) => {
t.timeout(1000 * 30)

const total = 100; // this should suffice for unit tests
return new Promise((resolve) => {
const events: any[] = [];

const handler = (evt: any) => {
if (evt.id === total) {
let prevId= 0;
for(const e of events) {
t.is(e.id, prevId + 1)
prevId++
}
resolve()
}
}

const callbacks = createCallbacks({
a: (_c: any, e: any) => {
events.push(e)
handler(e)
},
b: (_context: any, evt: any) => {
events.push(...evt)
handler(evt.pop())
},
});

const engine = createFakeEngine();
const context: any = {
logger,
};

eventProcessor(engine, context, callbacks, {
events: ['a', 'b'],
batch: { b: true },
batchLimit: 6,
batchInterval: 100,
});

let count = 0;
// randomly send events at high volume
while(count < total) {
count++;

if (Math.random() < 0.8) {
engine.emit('b', {id: count })
} else {
engine.emit('a', {id: count })
}
}
});
});

test('should continue processing if a callback throws', async (t) => {
const result: number[] = [];
const callbacks = createCallbacks({
Expand Down