Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ src/
│ ├── read-record-step-executor.ts # AI-powered record field reading step
│ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ ├── trigger-record-action-step-executor.ts # AI-powered action trigger step (with confirmation flow)
│ └── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
│ ├── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
│ └── guidance-step-executor.ts # Manual guidance step (saves user input, no AI)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
└── index.ts # Barrel exports
Expand Down
38 changes: 38 additions & 0 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
StepStateError,
WorkflowExecutorError,
} from '../errors';
import patchBodySchemas from '../pending-data-validators';
import SafeAgentPort from './safe-agent-port';
import StepSummaryBuilder from './summary/step-summary-builder';

Expand Down Expand Up @@ -118,6 +119,43 @@
);
}

/**
* Finds an existing pending execution and, when pendingData is provided,
* validates it against the step-type schema and merges it into the execution.
* Returns the (possibly updated) execution, or undefined if none exists.
*/
protected async patchAndReloadPendingData<TExec extends WithPendingData>(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium executors/base-step-executor.ts:127

patchAndReloadPendingData uses a non-null assertion (!) on patchBodySchemas[execution.type], but patchBodySchemas is a Partial that only includes schemas for specific step types. When the method is called with an execution whose type is not in that set (e.g., condition or read-record), schema becomes undefined and schema.safeParse(pendingData) throws a runtime error. Consider validating that the schema exists and throwing a StepStateError when the step type does not support pending data patches.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/workflow-executor/src/executors/base-step-executor.ts around line 127:

`patchAndReloadPendingData` uses a non-null assertion (`!`) on `patchBodySchemas[execution.type]`, but `patchBodySchemas` is a `Partial` that only includes schemas for specific step types. When the method is called with an execution whose type is not in that set (e.g., `condition` or `read-record`), `schema` becomes `undefined` and `schema.safeParse(pendingData)` throws a runtime error. Consider validating that the schema exists and throwing a `StepStateError` when the step type does not support pending data patches.

Evidence trail:
- packages/workflow-executor/src/executors/base-step-executor.ts lines 127-140 (REVIEWED_COMMIT): `patchAndReloadPendingData` method with `patchBodySchemas[execution.type]!` at line 134
- packages/workflow-executor/src/pending-data-validators.ts lines 5-43 (REVIEWED_COMMIT): `patchBodySchemas` defined as `Partial<Record<StepExecutionData['type'], z.ZodTypeAny>>` with only 5 step types having schemas
- packages/workflow-executor/src/types/step-execution-data.ts lines 140-150 (REVIEWED_COMMIT): `StepExecutionData` union type includes 8 types, 3 of which (`condition`, `read-record`, `record`) have no schema in `patchBodySchemas`
- packages/workflow-executor/src/executors/guidance-step-executor.ts lines 17-20 (REVIEWED_COMMIT): Shows correct pattern with explicit `if (!schema)` check before use

pendingData?: unknown,
): Promise<TExec | undefined> {
const { type } = this.context.stepDefinition;
const execution = await this.findPendingExecution<TExec>(type);

if (pendingData !== undefined && execution) {
const schema = patchBodySchemas[execution.type]!;

Check warning on line 134 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion
const parsed = schema.safeParse(pendingData);

if (!parsed.success) {
throw new StepStateError(
`Invalid pending data: ${parsed.error.issues.map(i => i.message).join(', ')}`,
);
}

const updated = {
...execution,
pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) },
} as TExec;

await this.context.runStore.saveStepExecution(
this.context.runId,
updated as StepExecutionData,
);

return updated;
}

return execution;
}

/**
* Shared confirmation flow for executors that require user approval before acting.
* Receives a pre-loaded execution (from findPendingExecution) and checks pendingData.userConfirmed:
Expand Down
49 changes: 49 additions & 0 deletions packages/workflow-executor/src/executors/guidance-step-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { StepExecutionResult } from '../types/execution';
import type { GuidanceStepDefinition } from '../types/step-definition';
import type { BaseStepStatus } from '../types/step-outcome';

import { StepStateError } from '../errors';
import patchBodySchemas from '../pending-data-validators';
import BaseStepExecutor from './base-step-executor';

export default class GuidanceStepExecutor extends BaseStepExecutor<GuidanceStepDefinition> {
protected async doExecute(): Promise<StepExecutionResult> {
const { incomingPendingData } = this.context;

if (!incomingPendingData) {
throw new StepStateError('Guidance step triggered without pending data');
}

const parsed = patchBodySchemas.guidance.safeParse(incomingPendingData);

if (!parsed.success) {
throw new StepStateError(
`Invalid guidance input: ${parsed.error.issues.map(i => i.message).join(', ')}`,
);
}

const { userInput } = parsed.data as { userInput: string };

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'guidance',
stepIndex: this.context.stepIndex,
executionResult: { userInput },
});

return this.buildOutcomeResult({ status: 'success' });
}

protected buildOutcomeResult(outcome: {
status: BaseStepStatus;
error?: string;
}): StepExecutionResult {
return {
stepOutcome: {
type: 'guidance',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
...outcome,
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ interface RelationTarget extends RelationRef {
export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<LoadRelatedRecordStepDefinition> {
protected async doExecute(): Promise<StepExecutionResult> {
// Branch A -- Re-entry after pending execution found in RunStore
const pending = await this.findPendingExecution<LoadRelatedRecordStepExecutionData>(
'load-related-record',
const pending = await this.patchAndReloadPendingData<LoadRelatedRecordStepExecutionData>(
this.context.incomingPendingData,
);

if (pending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>

protected async doExecute(): Promise<StepExecutionResult> {
// Branch A -- Re-entry after pending execution found in RunStore
const pending = await this.findPendingExecution<McpStepExecutionData>('mcp');
const pending = await this.patchAndReloadPendingData<McpStepExecutionData>(
this.context.incomingPendingData,
);

if (pending) {
return this.handleConfirmationFlow<McpStepExecutionData>(pending, execution =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
} from '../types/execution';
import type {
ConditionStepDefinition,
GuidanceStepDefinition,
LoadRelatedRecordStepDefinition,
McpStepDefinition,
ReadRecordStepDefinition,
Expand All @@ -22,6 +23,7 @@ import type { RemoteTool } from '@forestadmin/ai-proxy';

import { StepStateError, causeMessage } from '../errors';
import ConditionStepExecutor from './condition-step-executor';
import GuidanceStepExecutor from './guidance-step-executor';
import LoadRelatedRecordStepExecutor from './load-related-record-step-executor';
import McpStepExecutor from './mcp-step-executor';
import ReadRecordStepExecutor from './read-record-step-executor';
Expand All @@ -44,9 +46,10 @@ export default class StepExecutorFactory {
step: PendingStepExecution,
contextConfig: StepContextConfig,
loadTools: () => Promise<RemoteTool[]>,
incomingPendingData?: unknown,
): Promise<IStepExecutor> {
try {
const context = StepExecutorFactory.buildContext(step, contextConfig);
const context = StepExecutorFactory.buildContext(step, contextConfig, incomingPendingData);

switch (step.stepDefinition.type) {
case StepType.Condition:
Expand All @@ -70,6 +73,8 @@ export default class StepExecutorFactory {
context as ExecutionContext<McpStepDefinition>,
await loadTools(),
);
case StepType.Guidance:
return new GuidanceStepExecutor(context as ExecutionContext<GuidanceStepDefinition>);
default:
throw new StepStateError(
`Unknown step type: ${(step.stepDefinition as { type: string }).type}`,
Expand Down Expand Up @@ -102,6 +107,7 @@ export default class StepExecutorFactory {
private static buildContext(
step: PendingStepExecution,
cfg: StepContextConfig,
incomingPendingData?: unknown,
): ExecutionContext {
return {
...step,
Expand All @@ -111,6 +117,7 @@ export default class StepExecutorFactory {
runStore: cfg.runStore,
schemaCache: cfg.schemaCache,
logger: cfg.logger,
incomingPendingData,
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
GuidanceStepExecutionData,
LoadRelatedRecordStepExecutionData,
McpStepExecutionData,
StepExecutionData,
Expand All @@ -23,6 +24,8 @@ export default class StepExecutionFormatters {
return StepExecutionFormatters.formatLoadRelatedRecord(execution);
case 'mcp':
return StepExecutionFormatters.formatMcp(execution as McpStepExecutionData);
case 'guidance':
return StepExecutionFormatters.formatGuidance(execution as GuidanceStepExecutionData);
default:
return null;
}
Expand All @@ -42,6 +45,12 @@ export default class StepExecutionFormatters {
return ` Executed: ${toolName} (result not summarized)`;
}

private static formatGuidance(execution: GuidanceStepExecutionData): string | null {
if (!execution.executionResult) return null;

return ` The user provided the following input: "${execution.executionResult.userInput}"`;
}

private static formatLoadRelatedRecord(
execution: LoadRelatedRecordStepExecutionData,
): string | null {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default class StepSummaryBuilder {
if (customLine !== null) {
lines.push(customLine);
} else {
if (execution.executionParams !== undefined) {
if ('executionParams' in execution && execution.executionParams !== undefined) {
lines.push(` Input: ${JSON.stringify(execution.executionParams)}`);
} else if ('pendingData' in execution && execution.pendingData !== undefined) {
lines.push(` Pending: ${JSON.stringify(execution.pendingData)}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ interface ActionTarget extends ActionRef {
export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<TriggerActionStepDefinition> {
protected async doExecute(): Promise<StepExecutionResult> {
// Branch A -- Re-entry after pending execution found in RunStore
const pending = await this.findPendingExecution<TriggerRecordActionStepExecutionData>(
'trigger-action',
const pending = await this.patchAndReloadPendingData<TriggerRecordActionStepExecutionData>(
this.context.incomingPendingData,
);

if (pending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ interface UpdateTarget extends FieldRef {
export default class UpdateRecordStepExecutor extends RecordStepExecutor<UpdateRecordStepDefinition> {
protected async doExecute(): Promise<StepExecutionResult> {
// Branch A -- Re-entry after pending execution found in RunStore
const pending = await this.findPendingExecution<UpdateRecordStepExecutionData>('update-record');
const pending = await this.patchAndReloadPendingData<UpdateRecordStepExecutionData>(
this.context.incomingPendingData,
);

if (pending) {
return this.handleConfirmationFlow<UpdateRecordStepExecutionData>(pending, async exec => {
Expand Down
21 changes: 1 addition & 20 deletions packages/workflow-executor/src/http/executor-http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ import Koa from 'koa';
import koaJwt from 'koa-jwt';

import ConsoleLogger from '../adapters/console-logger';
import {
InvalidPendingDataError,
PendingDataNotFoundError,
RunNotFoundError,
UserMismatchError,
} from '../errors';
import { RunNotFoundError, UserMismatchError } from '../errors';

export interface ExecutorHttpServerOptions {
port: number;
Expand Down Expand Up @@ -198,20 +193,6 @@ export default class ExecutorHttpServer {
return;
}

if (err instanceof PendingDataNotFoundError) {
ctx.status = 404;
ctx.body = { error: 'Step execution not found or has no pending data' };

return;
}

if (err instanceof InvalidPendingDataError) {
ctx.status = 400;
ctx.body = { error: 'Invalid request body', details: err.issues };

return;
}

throw err;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/workflow-executor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type {
LoadRelatedRecordStepDefinition,
RecordStepDefinition,
McpStepDefinition,
GuidanceStepDefinition,
StepDefinition,
} from './types/step-definition';

Expand All @@ -15,6 +16,7 @@ export type {
ConditionStepOutcome,
RecordStepOutcome,
McpStepOutcome,
GuidanceStepOutcome,
StepOutcome,
} from './types/step-outcome';

Expand All @@ -35,6 +37,7 @@ export type {
McpToolRef,
McpToolCall,
McpStepExecutionData,
GuidanceStepExecutionData,
ExecutedStepExecutionData,
StepExecutionData,
} from './types/step-execution-data';
Expand Down Expand Up @@ -101,6 +104,7 @@ export { default as UpdateRecordStepExecutor } from './executors/update-record-s
export { default as TriggerRecordActionStepExecutor } from './executors/trigger-record-action-step-executor';
export { default as LoadRelatedRecordStepExecutor } from './executors/load-related-record-step-executor';
export { default as McpStepExecutor } from './executors/mcp-step-executor';
export { default as GuidanceStepExecutor } from './executors/guidance-step-executor';
export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port';
export { default as ForestServerWorkflowPort } from './adapters/forest-server-workflow-port';
export { default as ExecutorHttpServer } from './http/executor-http-server';
Expand Down
6 changes: 6 additions & 0 deletions packages/workflow-executor/src/pending-data-validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const patchBodySchemas: Partial<Record<StepExecutionData['type'], z.ZodTypeAny>>
})
.strict(),
// relatedCollectionName and suggestedFields are NOT accepted — internal executor data.

guidance: z
.object({
userInput: z.string().min(1),
})
.strict(),
};

export default patchBodySchemas;
Loading
Loading