diff --git a/.changeset/humble-ravens-send.md b/.changeset/humble-ravens-send.md new file mode 100644 index 000000000..94456009d --- /dev/null +++ b/.changeset/humble-ravens-send.md @@ -0,0 +1,5 @@ +--- +'@openfn/logger': patch +--- + +Fix an issue where logger.proxy can throw for nil values diff --git a/.changeset/odd-sloths-make.md b/.changeset/odd-sloths-make.md new file mode 100644 index 000000000..5fcdb347c --- /dev/null +++ b/.changeset/odd-sloths-make.md @@ -0,0 +1,6 @@ +--- +'@openfn/engine-multi': patch +'@openfn/ws-worker': patch +--- + +Use streaming algorithm to calculate payload size diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 31ecc0774..afd82f655 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -40,7 +40,6 @@ const createWorkflowEvents = ( workflowId: UUID ) => { // proxy all events to the main emitter - // uh actually there may be no point in this function proxy(event: string) { context.on(event, (evt) => { // ensure the run id is on the event diff --git a/packages/engine-multi/src/test/worker-functions.ts b/packages/engine-multi/src/test/worker-functions.ts index dca335cea..08ccd44ef 100644 --- a/packages/engine-multi/src/test/worker-functions.ts +++ b/packages/engine-multi/src/test/worker-functions.ts @@ -29,13 +29,13 @@ const tasks = { // Most tests should use the mock-worker instead run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => { const workflowId = plan.id; - publish('worker:workflow-start', { + await publish('worker:workflow-start', { workflowId, }); try { const [job] = plan.workflow.steps as Job[]; const result = eval(job.expression!); - publish('worker:workflow-complete', { + await publish('worker:workflow-complete', { workflowId, state: result, }); diff --git a/packages/engine-multi/src/util/ensure-payload-size.ts b/packages/engine-multi/src/util/ensure-payload-size.ts index db6dc6ee3..feb4ead85 100644 --- a/packages/engine-multi/src/util/ensure-payload-size.ts +++ b/packages/engine-multi/src/util/ensure-payload-size.ts @@ -1,3 +1,4 @@ +import { Buffer } from 'node:buffer'; import { JsonStreamStringify } from 'json-stream-stringify'; // This specifies which keys of an event payload to potentially redact @@ -25,7 +26,7 @@ export const verify = async ( if (algo === 'stream') { sizeBytes = await calculateSizeStream(value, limitBytes); } else { - sizeBytes = calculateSizeStringify(value); + sizeBytes = await calculateSizeStringify(value); } if (sizeBytes > limitBytes) { @@ -38,16 +39,30 @@ export const verify = async ( } }; -export const calculateSizeStringify = (value: any): number => { - const str = typeof value === 'string' ? value : JSON.stringify(value); - const size_bytes = Buffer.byteLength(str, 'utf8'); - return size_bytes; +export const calculateSizeStringify = (value: any): Promise => { + return new Promise((resolve) => { + const str = typeof value === 'string' ? value : JSON.stringify(value); + const size_bytes = Buffer.byteLength(str, 'utf8'); + setTimeout(() => resolve(size_bytes), 10); + return size_bytes; + }); }; export const calculateSizeStream = async ( value: any, limit?: number ): Promise => { + // skip all primitives + if ( + !value || + typeof value === 'number' || + typeof value === 'boolean' || + typeof value === 'function' + ) { + // Treat as size 0 + return 0; + } + let size_bytes = 0; const stream = new JsonStreamStringify(value); @@ -66,16 +81,61 @@ export const calculateSizeStream = async ( }; export default async (payload: any, limit_mb: number = 10) => { + if (!limit_mb || isNaN(limit_mb)) { + return payload; + } + const newPayload = { ...payload }; for (const key of KEYS_TO_VERIFY) { - try { - await verify(payload[key], limit_mb); - } catch (e) { - Object.assign(newPayload[key], replacements[key] ?? replacements.default); - newPayload.redacted = true; + if (key in payload) { + try { + await verify(payload[key], limit_mb, 'stringify'); + } catch (e: any) { + if (e.name === 'PAYLOAD_TOO_LARGE') { + Object.assign( + newPayload[key], + replacements[key] ?? replacements.default + ); + newPayload.redacted = true; + } else { + console.log(e); + } + } } } return newPayload; }; + +// export default async (payload: any, limit_mb: number = 10) => { +// return new Promise(async (resolve) => { +// if (!limit_mb || isNaN(limit_mb)) { +// resolve( payload); +// } + +// const newPayload = { ...payload }; + +// for (const key of KEYS_TO_VERIFY) { +// if (key in payload) { +// try { +// await verify(payload[key], limit_mb, 'stream'); +// } catch (e) { +// if (e.name === 'PAYLOAD_TOO_LARGE') { +// Object.assign( +// newPayload[key], +// replacements[key] ?? replacements.default +// ); +// newPayload.redacted = true; +// } +// } +// } +// } + +// setTimeout(() => { +// resolve(newPayload) + +// }, 1000) + +// }) +// }; diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 48a268803..dc23139ee 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -99,11 +99,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { }); // This will forward all internal console.debug() lines to the parent stdout - if (options.proxyStdout) { - child.stdout!.on('data', (data) => { - console.log(`${child.pid ?? ''} |> ${data.toString()}`); - }); - } + // if (options.proxyStdout) { + child.stdout!.on('data', (data) => { + console.log(`${child.pid ?? ''} |> ${data.toString()}`); + }); + // } logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; @@ -239,7 +239,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (!didTimeout) { resolve(evt.result); - finish(worker); + setTimeout(() => { + finish(worker); + }, 10000); } } else if (evt.type === ENGINE_REJECT_TASK) { // Note that this is an unexpected error diff --git a/packages/engine-multi/src/worker/thread/runtime.ts b/packages/engine-multi/src/worker/thread/runtime.ts index b40059dbe..680797989 100644 --- a/packages/engine-multi/src/worker/thread/runtime.ts +++ b/packages/engine-multi/src/worker/thread/runtime.ts @@ -51,12 +51,12 @@ export const publish = async ( type: string, payload: Omit ) => { + // console.log(type, payload) // Validate the size of every outgoing message // Redact any payloads that are too large const limit = payloadLimits?.[type as keyof PayloadLimits] ?? payloadLimits?.default; const safePayload = await ensurePayloadSize(payload, limit); - parentPort!.postMessage({ type, threadId, @@ -82,12 +82,12 @@ export const publishSync = async ( const run = (task: string, args: any[], options: Options = {}) => { payloadLimits = options.payloadLimits; tasks[task](...args) - .then((result) => { + .then((result) => publish(ENGINE_RESOLVE_TASK, { result, - }); - }) - .catch((e) => { + }) + ) + .catch((e) => publish(ENGINE_REJECT_TASK, { error: { severity: e.severity || 'crash', @@ -95,8 +95,8 @@ const run = (task: string, args: any[], options: Options = {}) => { type: e.type, name: e.name, }, - }); - }) + }) + ) .finally(() => { payloadLimits = undefined; }); diff --git a/packages/engine-multi/test/api.test.ts b/packages/engine-multi/test/api.test.ts index 40759f5ed..d8c1d2015 100644 --- a/packages/engine-multi/test/api.test.ts +++ b/packages/engine-multi/test/api.test.ts @@ -80,7 +80,7 @@ test.serial('engine api uses custom options', async (t) => { // Note that this runs with the actual runtime worker // I won't want to do deep testing on execute here - I just want to make sure the basic -// exeuction functionality is working. It's more a test of the api surface than the inner +// execution functionality is working. It's more a test of the api surface than the inner // workings of the job test.serial( 'execute should return an event listener and receive workflow-complete', @@ -88,6 +88,7 @@ test.serial( return new Promise(async (done) => { api = await createAPI({ logger, + proxyStdout: true, }); const plan: ExecutionPlan = { diff --git a/packages/engine-multi/test/util/ensure-payload-size.test.ts b/packages/engine-multi/test/util/ensure-payload-size.test.ts index 6babd7f7b..55351f6ff 100644 --- a/packages/engine-multi/test/util/ensure-payload-size.test.ts +++ b/packages/engine-multi/test/util/ensure-payload-size.test.ts @@ -17,7 +17,7 @@ import ensurePayloadSize, { }); test(algo + ': throw for limit 1byte, payload 2 bytes', async (t) => { - await t.throwsAsync(() => verify(12, 1 / 1024 / 1024, algo), { + await t.throwsAsync(() => verify('ab', 1 / 1024 / 1024, algo), { name: 'PAYLOAD_TOO_LARGE', }); }); diff --git a/packages/engine-multi/test/worker/mock-worker.test.ts b/packages/engine-multi/test/worker/mock-worker.test.ts index 6e519cda6..3aa07cb87 100644 --- a/packages/engine-multi/test/worker/mock-worker.test.ts +++ b/packages/engine-multi/test/worker/mock-worker.test.ts @@ -95,33 +95,28 @@ test('execute a mock plan with delay', async (t) => { }); test('Publish workflow-start event', async (t) => { + t.plan(1) const plan = createPlan(); plan.id = 'xx'; - let didFire = false; await workers.exec('run', [plan, {}], { on: ({ type }) => { if (type === e.WORKFLOW_START) { - didFire = true; + t.pass() } }, }); - t.true(didFire); }); test('Publish workflow-complete event with state', async (t) => { + t.plan(1) const plan = createPlan(); - let didFire = false; - let state; await workers.exec('run', [plan, {}], { on: ({ type, ...args }) => { if (type === e.WORKFLOW_COMPLETE) { - didFire = true; - state = args.state; + t.deepEqual(args.state, { data: { answer: 42 } }); } }, }); - t.true(didFire); - t.deepEqual(state, { data: { answer: 42 } }); }); test('Publish a job log event', async (t) => { @@ -131,19 +126,16 @@ test('Publish a job log event', async (t) => { return s; }`, }); - let didFire = false; let log: any; let id; await workers.exec('run', [plan, {}], { on: ({ workflowId, type, log: _log }) => { if (type === e.LOG) { - didFire = true; log = _log; id = workflowId; } }, }); - t.true(didFire); t.is(id, plan.id as any); t.is(log.level, 'info'); diff --git a/packages/logger/src/logger.ts b/packages/logger/src/logger.ts index ab064efa1..245a94fca 100644 --- a/packages/logger/src/logger.ts +++ b/packages/logger/src/logger.ts @@ -262,6 +262,8 @@ export default function (name?: string, options: LogOptions = {}): Logger { } j = j as JSONLog; + j.message ??= []; + log(j.name, j.level, ...j.message); return [j.name, j.level, ...j.message]; };