Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/humble-ravens-send.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/logger': patch
---

Fix an issue where logger.proxy can throw for nil values
6 changes: 6 additions & 0 deletions .changeset/odd-sloths-make.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': patch
'@openfn/ws-worker': patch
---

Use streaming algorithm to calculate payload size
1 change: 0 additions & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const createWorkflowEvents = (
workflowId: UUID
) => {
// proxy all events to the main emitter
// uh actually there may be no point in this
function proxy(event: string) {
context.on(event, (evt) => {
// ensure the run id is on the event
Expand Down
4 changes: 2 additions & 2 deletions packages/engine-multi/src/test/worker-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ const tasks = {
// Most tests should use the mock-worker instead
run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => {
const workflowId = plan.id;
publish('worker:workflow-start', {
await publish('worker:workflow-start', {
workflowId,
});
try {
const [job] = plan.workflow.steps as Job[];
const result = eval(job.expression!);
publish('worker:workflow-complete', {
await publish('worker:workflow-complete', {
workflowId,
state: result,
});
Expand Down
80 changes: 70 additions & 10 deletions packages/engine-multi/src/util/ensure-payload-size.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Buffer } from 'node:buffer';
import { JsonStreamStringify } from 'json-stream-stringify';

// This specifies which keys of an event payload to potentially redact
Expand Down Expand Up @@ -25,7 +26,7 @@ export const verify = async (
if (algo === 'stream') {
sizeBytes = await calculateSizeStream(value, limitBytes);
} else {
sizeBytes = calculateSizeStringify(value);
sizeBytes = await calculateSizeStringify(value);
}

if (sizeBytes > limitBytes) {
Expand All @@ -38,16 +39,30 @@ export const verify = async (
}
};

export const calculateSizeStringify = (value: any): number => {
const str = typeof value === 'string' ? value : JSON.stringify(value);
const size_bytes = Buffer.byteLength(str, 'utf8');
return size_bytes;
export const calculateSizeStringify = (value: any): Promise<number> => {
return new Promise((resolve) => {
const str = typeof value === 'string' ? value : JSON.stringify(value);
const size_bytes = Buffer.byteLength(str, 'utf8');
setTimeout(() => resolve(size_bytes), 10);
return size_bytes;
});
};

export const calculateSizeStream = async (
value: any,
limit?: number
): Promise<number> => {
// skip all primitives
if (
!value ||
typeof value === 'number' ||
typeof value === 'boolean' ||
typeof value === 'function'
) {
// Treat as size 0
return 0;
}

let size_bytes = 0;

const stream = new JsonStreamStringify(value);
Expand All @@ -66,16 +81,61 @@ export const calculateSizeStream = async (
};

export default async (payload: any, limit_mb: number = 10) => {
if (!limit_mb || isNaN(limit_mb)) {
return payload;
}

const newPayload = { ...payload };

for (const key of KEYS_TO_VERIFY) {
try {
await verify(payload[key], limit_mb);
} catch (e) {
Object.assign(newPayload[key], replacements[key] ?? replacements.default);
newPayload.redacted = true;
if (key in payload) {
try {
await verify(payload[key], limit_mb, 'stringify');
} catch (e: any) {
if (e.name === 'PAYLOAD_TOO_LARGE') {
Object.assign(
newPayload[key],
replacements[key] ?? replacements.default
);
newPayload.redacted = true;
} else {
console.log(e);
}
}
}
}

return newPayload;
};

// export default async (payload: any, limit_mb: number = 10) => {
// return new Promise(async (resolve) => {
// if (!limit_mb || isNaN(limit_mb)) {
// resolve( payload);
// }

// const newPayload = { ...payload };

// for (const key of KEYS_TO_VERIFY) {
// if (key in payload) {
// try {
// await verify(payload[key], limit_mb, 'stream');
// } catch (e) {
// if (e.name === 'PAYLOAD_TOO_LARGE') {
// Object.assign(
// newPayload[key],
// replacements[key] ?? replacements.default
// );
// newPayload.redacted = true;
// }
// }
// }
// }

// setTimeout(() => {
// resolve(newPayload)

// }, 1000)

// })
// };
14 changes: 8 additions & 6 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
});

// This will forward all internal console.debug() lines to the parent stdout
if (options.proxyStdout) {
child.stdout!.on('data', (data) => {
console.log(`${child.pid ?? ''} |> ${data.toString()}`);
});
}
// if (options.proxyStdout) {
child.stdout!.on('data', (data) => {
console.log(`${child.pid ?? ''} |> ${data.toString()}`);
});
// }

logger.debug('pool: Created new child process', child.pid);
allWorkers[child.pid!] = child;
Expand Down Expand Up @@ -239,7 +239,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
if (!didTimeout) {
resolve(evt.result);

finish(worker);
setTimeout(() => {
finish(worker);
}, 10000);
}
} else if (evt.type === ENGINE_REJECT_TASK) {
// Note that this is an unexpected error
Expand Down
14 changes: 7 additions & 7 deletions packages/engine-multi/src/worker/thread/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ export const publish = async (
type: string,
payload: Omit<Event, 'threadId' | 'type'>
) => {
// console.log(type, payload)
// Validate the size of every outgoing message
// Redact any payloads that are too large
const limit =
payloadLimits?.[type as keyof PayloadLimits] ?? payloadLimits?.default;
const safePayload = await ensurePayloadSize(payload, limit);

parentPort!.postMessage({
type,
threadId,
Expand All @@ -82,21 +82,21 @@ export const publishSync = async (
const run = (task: string, args: any[], options: Options = {}) => {
payloadLimits = options.payloadLimits;
tasks[task](...args)
.then((result) => {
.then((result) =>
publish(ENGINE_RESOLVE_TASK, {
result,
});
})
.catch((e) => {
})
)
.catch((e) =>
publish(ENGINE_REJECT_TASK, {
error: {
severity: e.severity || 'crash',
message: e.message,
type: e.type,
name: e.name,
},
});
})
})
)
.finally(() => {
payloadLimits = undefined;
});
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ test.serial('engine api uses custom options', async (t) => {

// Note that this runs with the actual runtime worker
// I won't want to do deep testing on execute here - I just want to make sure the basic
// exeuction functionality is working. It's more a test of the api surface than the inner
// execution functionality is working. It's more a test of the api surface than the inner
// workings of the job
test.serial(
'execute should return an event listener and receive workflow-complete',
async (t) => {
return new Promise(async (done) => {
api = await createAPI({
logger,
proxyStdout: true,
});

const plan: ExecutionPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import ensurePayloadSize, {
});

test(algo + ': throw for limit 1byte, payload 2 bytes', async (t) => {
await t.throwsAsync(() => verify(12, 1 / 1024 / 1024, algo), {
await t.throwsAsync(() => verify('ab', 1 / 1024 / 1024, algo), {
name: 'PAYLOAD_TOO_LARGE',
});
});
Expand Down
16 changes: 4 additions & 12 deletions packages/engine-multi/test/worker/mock-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,33 +95,28 @@ test('execute a mock plan with delay', async (t) => {
});

test('Publish workflow-start event', async (t) => {
t.plan(1)
const plan = createPlan();
plan.id = 'xx';
let didFire = false;
await workers.exec('run', [plan, {}], {
on: ({ type }) => {
if (type === e.WORKFLOW_START) {
didFire = true;
t.pass()
}
},
});
t.true(didFire);
});

test('Publish workflow-complete event with state', async (t) => {
t.plan(1)
const plan = createPlan();
let didFire = false;
let state;
await workers.exec('run', [plan, {}], {
on: ({ type, ...args }) => {
if (type === e.WORKFLOW_COMPLETE) {
didFire = true;
state = args.state;
t.deepEqual(args.state, { data: { answer: 42 } });
}
},
});
t.true(didFire);
t.deepEqual(state, { data: { answer: 42 } });
});

test('Publish a job log event', async (t) => {
Expand All @@ -131,19 +126,16 @@ test('Publish a job log event', async (t) => {
return s;
}`,
});
let didFire = false;
let log: any;
let id;
await workers.exec('run', [plan, {}], {
on: ({ workflowId, type, log: _log }) => {
if (type === e.LOG) {
didFire = true;
log = _log;
id = workflowId;
}
},
});
t.true(didFire);
t.is(id, plan.id as any);

t.is(log.level, 'info');
Expand Down
2 changes: 2 additions & 0 deletions packages/logger/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ export default function (name?: string, options: LogOptions = {}): Logger {
}
j = j as JSONLog;

j.message ??= [];

log(j.name, j.level, ...j.message);
return [j.name, j.level, ...j.message];
};
Expand Down