diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 905e40c15e..6287614aa3 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -67,7 +67,8 @@ src/ │ ├── read-record-step-executor.ts # AI-powered record field reading step │ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow) │ ├── trigger-record-action-step-executor.ts # AI-powered action trigger step (with confirmation flow) -│ └── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow) +│ ├── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow) +│ └── guidance-step-executor.ts # Manual guidance step (saves user input, no AI) ├── http/ # HTTP server (optional, for frontend data access) │ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger └── index.ts # Barrel exports diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 759565d9ce..6d0a7af43b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -17,6 +17,7 @@ import { StepStateError, WorkflowExecutorError, } from '../errors'; +import patchBodySchemas from '../pending-data-validators'; import SafeAgentPort from './safe-agent-port'; import StepSummaryBuilder from './summary/step-summary-builder'; @@ -118,6 +119,43 @@ export default abstract class BaseStepExecutor( + pendingData?: unknown, + ): Promise { + const { type } = this.context.stepDefinition; + const execution = await this.findPendingExecution(type); + + if (pendingData !== undefined && execution) { + const schema = patchBodySchemas[execution.type]!; + const parsed = schema.safeParse(pendingData); + + if (!parsed.success) { + throw new StepStateError( + `Invalid pending data: ${parsed.error.issues.map(i => i.message).join(', ')}`, + ); + } + + const updated = { + ...execution, + pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) }, + } as TExec; + + await this.context.runStore.saveStepExecution( + this.context.runId, + updated as StepExecutionData, + ); + + return updated; + } + + return execution; + } + /** * Shared confirmation flow for executors that require user approval before acting. * Receives a pre-loaded execution (from findPendingExecution) and checks pendingData.userConfirmed: diff --git a/packages/workflow-executor/src/executors/guidance-step-executor.ts b/packages/workflow-executor/src/executors/guidance-step-executor.ts new file mode 100644 index 0000000000..3b4e67adb5 --- /dev/null +++ b/packages/workflow-executor/src/executors/guidance-step-executor.ts @@ -0,0 +1,49 @@ +import type { StepExecutionResult } from '../types/execution'; +import type { GuidanceStepDefinition } from '../types/step-definition'; +import type { BaseStepStatus } from '../types/step-outcome'; + +import { StepStateError } from '../errors'; +import patchBodySchemas from '../pending-data-validators'; +import BaseStepExecutor from './base-step-executor'; + +export default class GuidanceStepExecutor extends BaseStepExecutor { + protected async doExecute(): Promise { + const { incomingPendingData } = this.context; + + if (!incomingPendingData) { + throw new StepStateError('Guidance step triggered without pending data'); + } + + const parsed = patchBodySchemas.guidance.safeParse(incomingPendingData); + + if (!parsed.success) { + throw new StepStateError( + `Invalid guidance input: ${parsed.error.issues.map(i => i.message).join(', ')}`, + ); + } + + const { userInput } = parsed.data as { userInput: string }; + + await this.context.runStore.saveStepExecution(this.context.runId, { + type: 'guidance', + stepIndex: this.context.stepIndex, + executionResult: { userInput }, + }); + + return this.buildOutcomeResult({ status: 'success' }); + } + + protected buildOutcomeResult(outcome: { + status: BaseStepStatus; + error?: string; + }): StepExecutionResult { + return { + stepOutcome: { + type: 'guidance', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + ...outcome, + }, + }; + } +} diff --git a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts index a7db4ab5d5..1a85027376 100644 --- a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts @@ -39,8 +39,8 @@ interface RelationTarget extends RelationRef { export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore - const pending = await this.findPendingExecution( - 'load-related-record', + const pending = await this.patchAndReloadPendingData( + this.context.incomingPendingData, ); if (pending) { diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 454cd57393..b5fdd4e1e9 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -46,7 +46,9 @@ export default class McpStepExecutor extends BaseStepExecutor protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore - const pending = await this.findPendingExecution('mcp'); + const pending = await this.patchAndReloadPendingData( + this.context.incomingPendingData, + ); if (pending) { return this.handleConfirmationFlow(pending, execution => diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index ed0cf1249f..25f027b3a9 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -12,6 +12,7 @@ import type { } from '../types/execution'; import type { ConditionStepDefinition, + GuidanceStepDefinition, LoadRelatedRecordStepDefinition, McpStepDefinition, ReadRecordStepDefinition, @@ -22,6 +23,7 @@ import type { RemoteTool } from '@forestadmin/ai-proxy'; import { StepStateError, causeMessage } from '../errors'; import ConditionStepExecutor from './condition-step-executor'; +import GuidanceStepExecutor from './guidance-step-executor'; import LoadRelatedRecordStepExecutor from './load-related-record-step-executor'; import McpStepExecutor from './mcp-step-executor'; import ReadRecordStepExecutor from './read-record-step-executor'; @@ -44,9 +46,10 @@ export default class StepExecutorFactory { step: PendingStepExecution, contextConfig: StepContextConfig, loadTools: () => Promise, + incomingPendingData?: unknown, ): Promise { try { - const context = StepExecutorFactory.buildContext(step, contextConfig); + const context = StepExecutorFactory.buildContext(step, contextConfig, incomingPendingData); switch (step.stepDefinition.type) { case StepType.Condition: @@ -70,6 +73,8 @@ export default class StepExecutorFactory { context as ExecutionContext, await loadTools(), ); + case StepType.Guidance: + return new GuidanceStepExecutor(context as ExecutionContext); default: throw new StepStateError( `Unknown step type: ${(step.stepDefinition as { type: string }).type}`, @@ -102,6 +107,7 @@ export default class StepExecutorFactory { private static buildContext( step: PendingStepExecution, cfg: StepContextConfig, + incomingPendingData?: unknown, ): ExecutionContext { return { ...step, @@ -111,6 +117,7 @@ export default class StepExecutorFactory { runStore: cfg.runStore, schemaCache: cfg.schemaCache, logger: cfg.logger, + incomingPendingData, }; } } diff --git a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts index 50a445f1b3..bdfd8938cf 100644 --- a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts +++ b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts @@ -1,4 +1,5 @@ import type { + GuidanceStepExecutionData, LoadRelatedRecordStepExecutionData, McpStepExecutionData, StepExecutionData, @@ -23,6 +24,8 @@ export default class StepExecutionFormatters { return StepExecutionFormatters.formatLoadRelatedRecord(execution); case 'mcp': return StepExecutionFormatters.formatMcp(execution as McpStepExecutionData); + case 'guidance': + return StepExecutionFormatters.formatGuidance(execution as GuidanceStepExecutionData); default: return null; } @@ -42,6 +45,12 @@ export default class StepExecutionFormatters { return ` Executed: ${toolName} (result not summarized)`; } + private static formatGuidance(execution: GuidanceStepExecutionData): string | null { + if (!execution.executionResult) return null; + + return ` The user provided the following input: "${execution.executionResult.userInput}"`; + } + private static formatLoadRelatedRecord( execution: LoadRelatedRecordStepExecutionData, ): string | null { diff --git a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts index abe155924f..c593545fed 100644 --- a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts +++ b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts @@ -23,7 +23,7 @@ export default class StepSummaryBuilder { if (customLine !== null) { lines.push(customLine); } else { - if (execution.executionParams !== undefined) { + if ('executionParams' in execution && execution.executionParams !== undefined) { lines.push(` Input: ${JSON.stringify(execution.executionParams)}`); } else if ('pendingData' in execution && execution.pendingData !== undefined) { lines.push(` Pending: ${JSON.stringify(execution.pendingData)}`); diff --git a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts index dffb1bdd1d..6837a49eb3 100644 --- a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts +++ b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts @@ -24,8 +24,8 @@ interface ActionTarget extends ActionRef { export default class TriggerRecordActionStepExecutor extends RecordStepExecutor { protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore - const pending = await this.findPendingExecution( - 'trigger-action', + const pending = await this.patchAndReloadPendingData( + this.context.incomingPendingData, ); if (pending) { diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 8edbfc6ea0..2f8d94a08e 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -30,7 +30,9 @@ interface UpdateTarget extends FieldRef { export default class UpdateRecordStepExecutor extends RecordStepExecutor { protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore - const pending = await this.findPendingExecution('update-record'); + const pending = await this.patchAndReloadPendingData( + this.context.incomingPendingData, + ); if (pending) { return this.handleConfirmationFlow(pending, async exec => { diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index e9f3cec9b6..add56f3b3d 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -11,12 +11,7 @@ import Koa from 'koa'; import koaJwt from 'koa-jwt'; import ConsoleLogger from '../adapters/console-logger'; -import { - InvalidPendingDataError, - PendingDataNotFoundError, - RunNotFoundError, - UserMismatchError, -} from '../errors'; +import { RunNotFoundError, UserMismatchError } from '../errors'; export interface ExecutorHttpServerOptions { port: number; @@ -198,20 +193,6 @@ export default class ExecutorHttpServer { return; } - if (err instanceof PendingDataNotFoundError) { - ctx.status = 404; - ctx.body = { error: 'Step execution not found or has no pending data' }; - - return; - } - - if (err instanceof InvalidPendingDataError) { - ctx.status = 400; - ctx.body = { error: 'Invalid request body', details: err.issues }; - - return; - } - throw err; } diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index ea367b7229..4c4b9ae243 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -7,6 +7,7 @@ export type { LoadRelatedRecordStepDefinition, RecordStepDefinition, McpStepDefinition, + GuidanceStepDefinition, StepDefinition, } from './types/step-definition'; @@ -15,6 +16,7 @@ export type { ConditionStepOutcome, RecordStepOutcome, McpStepOutcome, + GuidanceStepOutcome, StepOutcome, } from './types/step-outcome'; @@ -35,6 +37,7 @@ export type { McpToolRef, McpToolCall, McpStepExecutionData, + GuidanceStepExecutionData, ExecutedStepExecutionData, StepExecutionData, } from './types/step-execution-data'; @@ -101,6 +104,7 @@ export { default as UpdateRecordStepExecutor } from './executors/update-record-s export { default as TriggerRecordActionStepExecutor } from './executors/trigger-record-action-step-executor'; export { default as LoadRelatedRecordStepExecutor } from './executors/load-related-record-step-executor'; export { default as McpStepExecutor } from './executors/mcp-step-executor'; +export { default as GuidanceStepExecutor } from './executors/guidance-step-executor'; export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port'; export { default as ForestServerWorkflowPort } from './adapters/forest-server-workflow-port'; export { default as ExecutorHttpServer } from './http/executor-http-server'; diff --git a/packages/workflow-executor/src/pending-data-validators.ts b/packages/workflow-executor/src/pending-data-validators.ts index 1a02e1989e..cb3957918b 100644 --- a/packages/workflow-executor/src/pending-data-validators.ts +++ b/packages/workflow-executor/src/pending-data-validators.ts @@ -32,6 +32,12 @@ const patchBodySchemas: Partial> }) .strict(), // relatedCollectionName and suggestedFields are NOT accepted — internal executor data. + + guidance: z + .object({ + userInput: z.string().min(1), + }) + .strict(), }; export default patchBodySchemas; diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index b661392716..41fadee427 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -10,15 +10,8 @@ import type { StepExecutionData } from './types/step-execution-data'; import type { RemoteTool } from '@forestadmin/ai-proxy'; import ConsoleLogger from './adapters/console-logger'; -import { - InvalidPendingDataError, - PendingDataNotFoundError, - RunNotFoundError, - UserMismatchError, - causeMessage, -} from './errors'; +import { RunNotFoundError, UserMismatchError, causeMessage } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; -import patchBodySchemas from './pending-data-validators'; import validateSecrets from './validate-secrets'; export type RunnerState = 'idle' | 'running' | 'draining' | 'stopped'; @@ -146,33 +139,6 @@ export default class Runner { return this.config.runStore.getStepExecutions(runId); } - private async patchPendingData(runId: string, stepIndex: number, body: unknown): Promise { - const stepExecutions = await this.config.runStore.getStepExecutions(runId); - const execution = stepExecutions.find(e => e.stepIndex === stepIndex); - const schema = execution ? patchBodySchemas[execution.type] : undefined; - - if (!execution || !schema || !('pendingData' in execution) || execution.pendingData == null) { - throw new PendingDataNotFoundError(runId, stepIndex); - } - - const parsed = schema.safeParse(body); - - if (!parsed.success) { - throw new InvalidPendingDataError( - parsed.error.issues.map(({ path, message, code }) => ({ - path: path as (string | number)[], - message, - code, - })), - ); - } - - await this.config.runStore.saveStepExecution(runId, { - ...execution, - pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) }, - } as StepExecutionData); - } - async triggerPoll( runId: string, options?: { pendingData?: unknown; bearerUserId?: number }, @@ -185,13 +151,9 @@ export default class Runner { throw new UserMismatchError(runId); } - if (options?.pendingData !== undefined) { - await this.patchPendingData(runId, step.stepIndex, options.pendingData); - } - if (this.inFlightSteps.has(Runner.stepKey(step))) return; - await this.executeStep(step); + await this.executeStep(step, options?.pendingData); } private schedulePoll(): void { @@ -226,20 +188,27 @@ export default class Runner { return this.config.aiModelPort.loadRemoteTools(mergedConfig); } - private executeStep(step: PendingStepExecution): Promise { + private executeStep(step: PendingStepExecution, incomingPendingData?: unknown): Promise { const key = Runner.stepKey(step); - const promise = this.doExecuteStep(step, key); + const promise = this.doExecuteStep(step, key, incomingPendingData); this.inFlightSteps.set(key, promise); return promise; } - private async doExecuteStep(step: PendingStepExecution, key: string): Promise { + private async doExecuteStep( + step: PendingStepExecution, + key: string, + incomingPendingData?: unknown, + ): Promise { let result: StepExecutionResult; try { - const executor = await StepExecutorFactory.create(step, this.contextConfig, () => - this.fetchRemoteTools(), + const executor = await StepExecutorFactory.create( + step, + this.contextConfig, + () => this.fetchRemoteTools(), + incomingPendingData, ); result = await executor.execute(); } catch (error) { diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index 7251928081..f1cdbd3efa 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -60,4 +60,5 @@ export interface ExecutionContext readonly schemaCache: SchemaCache; readonly previousSteps: ReadonlyArray>; readonly logger: Logger; + readonly incomingPendingData?: unknown; } diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index 70782b8d4b..14a27516e4 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -7,6 +7,7 @@ export enum StepType { TriggerAction = 'trigger-action', LoadRelatedRecord = 'load-related-record', Mcp = 'mcp', + Guidance = 'guidance', } interface BaseStepDefinition { @@ -68,10 +69,18 @@ export interface McpStepDefinition extends BaseStepDefinition { automaticExecution?: boolean; } +export interface GuidanceStepDefinition extends BaseStepDefinition { + type: StepType.Guidance; +} + export type RecordStepDefinition = | ReadRecordStepDefinition | UpdateRecordStepDefinition | TriggerActionStepDefinition | LoadRelatedRecordStepDefinition; -export type StepDefinition = ConditionStepDefinition | RecordStepDefinition | McpStepDefinition; +export type StepDefinition = + | ConditionStepDefinition + | RecordStepDefinition + | McpStepDefinition + | GuidanceStepDefinition; diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index f2349d28be..31ec78ec4e 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -137,6 +137,14 @@ export interface LoadRelatedRecordStepExecutionData extends BaseStepExecutionDat executionResult?: { relation: RelationRef; record: RecordRef } | { skipped: true }; } +// -- Guidance -- + +export interface GuidanceStepExecutionData extends BaseStepExecutionData { + type: 'guidance'; + pendingData?: { userInput?: string }; + executionResult?: { userInput: string }; +} + // -- Union -- export type StepExecutionData = @@ -146,7 +154,8 @@ export type StepExecutionData = | TriggerRecordActionStepExecutionData | RecordStepExecutionData | LoadRelatedRecordStepExecutionData - | McpStepExecutionData; + | McpStepExecutionData + | GuidanceStepExecutionData; /** Alias for StepExecutionData — kept for backwards-compatible consumption at the call sites. */ export type ExecutedStepExecutionData = StepExecutionData; diff --git a/packages/workflow-executor/src/types/step-outcome.ts b/packages/workflow-executor/src/types/step-outcome.ts index f2fc45793f..151582b101 100644 --- a/packages/workflow-executor/src/types/step-outcome.ts +++ b/packages/workflow-executor/src/types/step-outcome.ts @@ -39,11 +39,21 @@ export interface McpStepOutcome extends BaseStepOutcome { status: RecordStepStatus; } -export type StepOutcome = ConditionStepOutcome | RecordStepOutcome | McpStepOutcome; +export interface GuidanceStepOutcome extends BaseStepOutcome { + type: 'guidance'; + status: BaseStepStatus; +} + +export type StepOutcome = + | ConditionStepOutcome + | RecordStepOutcome + | McpStepOutcome + | GuidanceStepOutcome; -export function stepTypeToOutcomeType(type: StepType): 'condition' | 'record' | 'mcp' { +export function stepTypeToOutcomeType(type: StepType): 'condition' | 'record' | 'mcp' | 'guidance' { if (type === StepType.Condition) return 'condition'; if (type === StepType.Mcp) return 'mcp'; + if (type === StepType.Guidance) return 'guidance'; return 'record'; } diff --git a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts new file mode 100644 index 0000000000..50a1c25f81 --- /dev/null +++ b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts @@ -0,0 +1,117 @@ +import type { RunStore } from '../../src/ports/run-store'; +import type { ExecutionContext } from '../../src/types/execution'; +import type { RecordRef } from '../../src/types/record'; +import type { GuidanceStepDefinition } from '../../src/types/step-definition'; +import type { GuidanceStepOutcome } from '../../src/types/step-outcome'; + +import GuidanceStepExecutor from '../../src/executors/guidance-step-executor'; +import SchemaCache from '../../src/schema-cache'; +import { StepType } from '../../src/types/step-definition'; + +function makeMockRunStore(overrides: Partial = {}): RunStore { + return { + init: jest.fn().mockResolvedValue(undefined), + close: jest.fn().mockResolvedValue(undefined), + getStepExecutions: jest.fn().mockResolvedValue([]), + saveStepExecution: jest.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +function makeContext( + overrides: Partial> = {}, +): ExecutionContext { + return { + runId: 'run-1', + stepId: 'guidance-1', + stepIndex: 0, + baseRecordRef: { + collectionName: 'customers', + recordId: [1], + stepIndex: 0, + } as RecordRef, + stepDefinition: { type: StepType.Guidance }, + model: {} as ExecutionContext['model'], + agentPort: {} as ExecutionContext['agentPort'], + workflowPort: {} as ExecutionContext['workflowPort'], + runStore: makeMockRunStore(), + user: { + id: 1, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + }, + schemaCache: new SchemaCache(), + previousSteps: [], + logger: { info: jest.fn(), error: jest.fn() }, + ...overrides, + }; +} + +describe('GuidanceStepExecutor', () => { + it('saves executionResult and returns success when incomingPendingData has valid userInput', async () => { + const runStore = makeMockRunStore(); + + const executor = new GuidanceStepExecutor( + makeContext({ runStore, incomingPendingData: { userInput: 'Please proceed with option A' } }), + ); + const result = await executor.execute(); + + const outcome = result.stepOutcome as GuidanceStepOutcome; + expect(outcome.type).toBe('guidance'); + expect(outcome.status).toBe('success'); + expect(outcome.stepId).toBe('guidance-1'); + expect(outcome.stepIndex).toBe(0); + + expect(runStore.saveStepExecution).toHaveBeenCalledWith('run-1', { + type: 'guidance', + stepIndex: 0, + executionResult: { userInput: 'Please proceed with option A' }, + }); + }); + + it('returns error outcome when incomingPendingData is undefined', async () => { + const runStore = makeMockRunStore(); + + const executor = new GuidanceStepExecutor(makeContext({ runStore })); + const result = await executor.execute(); + + const outcome = result.stepOutcome as GuidanceStepOutcome; + expect(outcome.type).toBe('guidance'); + expect(outcome.status).toBe('error'); + expect(outcome.error).toBe('An unexpected error occurred while processing this step.'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('returns error outcome when incomingPendingData has empty userInput', async () => { + const runStore = makeMockRunStore(); + + const executor = new GuidanceStepExecutor( + makeContext({ runStore, incomingPendingData: { userInput: '' } }), + ); + const result = await executor.execute(); + + const outcome = result.stepOutcome as GuidanceStepOutcome; + expect(outcome.type).toBe('guidance'); + expect(outcome.status).toBe('error'); + expect(outcome.error).toBe('An unexpected error occurred while processing this step.'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('returns error outcome when incomingPendingData has no userInput field', async () => { + const runStore = makeMockRunStore(); + + const executor = new GuidanceStepExecutor(makeContext({ runStore, incomingPendingData: {} })); + const result = await executor.execute(); + + const outcome = result.stepOutcome as GuidanceStepOutcome; + expect(outcome.type).toBe('guidance'); + expect(outcome.status).toBe('error'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts index 97301e297e..9a79847e02 100644 --- a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts +++ b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts @@ -144,5 +144,28 @@ describe('StepExecutionFormatters', () => { expect(StepExecutionFormatters.format(execution)).toBeNull(); }); }); + + describe('guidance', () => { + it('returns the user input line when executionResult is present', () => { + const execution: StepExecutionData = { + type: 'guidance', + stepIndex: 0, + executionResult: { userInput: 'I called the client and confirmed the delivery date.' }, + }; + + expect(StepExecutionFormatters.format(execution)).toBe( + ' The user provided the following input: "I called the client and confirmed the delivery date."', + ); + }); + + it('returns null when executionResult is absent', () => { + const execution: StepExecutionData = { + type: 'guidance', + stepIndex: 0, + }; + + expect(StepExecutionFormatters.format(execution)).toBeNull(); + }); + }); }); }); diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 6e0594fcac..a298d546d2 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -987,4 +987,29 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('error'); }); }); + + describe('patchAndReloadPendingData validation', () => { + it('returns error when incomingPendingData fails Zod validation', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'update-record', + stepIndex: 0, + pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + + const context = makeContext({ + runStore, + incomingPendingData: { invalidField: true }, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + }); + }); }); diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index 9981b96be8..97c2341218 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -4,12 +4,7 @@ import type Runner from '../../src/runner'; import jsonwebtoken from 'jsonwebtoken'; import request from 'supertest'; -import { - InvalidPendingDataError, - PendingDataNotFoundError, - RunNotFoundError, - UserMismatchError, -} from '../../src/errors'; +import { RunNotFoundError, UserMismatchError } from '../../src/errors'; import ExecutorHttpServer from '../../src/http/executor-http-server'; const AUTH_SECRET = 'test-auth-secret'; @@ -403,41 +398,6 @@ describe('ExecutorHttpServer', () => { expect(response.body).toEqual({ error: 'Forbidden' }); }); - it('returns 404 when triggerPoll rejects with PendingDataNotFoundError', async () => { - const runner = createMockRunner({ - triggerPoll: jest.fn().mockRejectedValue(new PendingDataNotFoundError('run-1', 0)), - }); - - const server = createServer({ runner }); - const token = signToken({ id: 1 }); - - const response = await request(server.callback) - .post('/runs/run-1/trigger') - .set('Authorization', `Bearer ${token}`); - - expect(response.status).toBe(404); - expect(response.body).toEqual({ error: 'Step execution not found or has no pending data' }); - }); - - it('returns 400 when triggerPoll rejects with InvalidPendingDataError', async () => { - const issues = [ - { path: ['userConfirmed'], message: 'Expected boolean', code: 'invalid_type' }, - ]; - const runner = createMockRunner({ - triggerPoll: jest.fn().mockRejectedValue(new InvalidPendingDataError(issues)), - }); - - const server = createServer({ runner }); - const token = signToken({ id: 1 }); - - const response = await request(server.callback) - .post('/runs/run-1/trigger') - .set('Authorization', `Bearer ${token}`); - - expect(response.status).toBe(400); - expect(response.body).toEqual({ error: 'Invalid request body', details: issues }); - }); - it('returns 500 when triggerPoll rejects with an unexpected error', async () => { const runner = createMockRunner({ triggerPoll: jest.fn().mockRejectedValue(new Error('unexpected')), diff --git a/packages/workflow-executor/test/index.test.ts b/packages/workflow-executor/test/index.test.ts index 585d0c0b72..bd7e4a6216 100644 --- a/packages/workflow-executor/test/index.test.ts +++ b/packages/workflow-executor/test/index.test.ts @@ -3,9 +3,9 @@ import { StepType } from '../src/index'; jest.mock('@langchain/openai', () => ({ ChatOpenAI: jest.fn() })); describe('StepType', () => { - it('should expose exactly 6 step types', () => { + it('should expose exactly 7 step types', () => { const values = Object.values(StepType); - expect(values).toHaveLength(6); + expect(values).toHaveLength(7); }); it.each([ @@ -15,6 +15,7 @@ describe('StepType', () => { ['TriggerAction', 'trigger-action'], ['LoadRelatedRecord', 'load-related-record'], ['Mcp', 'mcp'], + ['Guidance', 'guidance'], ] as const)('should have %s = "%s"', (key, value) => { expect(StepType[key]).toBe(value); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 12d6008a4f..dc59ceb8dc 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -8,15 +8,10 @@ import type { PendingStepExecution } from '../src/types/execution'; import type { StepDefinition } from '../src/types/step-definition'; import type { BaseChatModel } from '@forestadmin/ai-proxy'; -import { - ConfigurationError, - InvalidPendingDataError, - PendingDataNotFoundError, - RunNotFoundError, - UserMismatchError, -} from '../src/errors'; +import { ConfigurationError, RunNotFoundError, UserMismatchError } from '../src/errors'; import BaseStepExecutor from '../src/executors/base-step-executor'; import ConditionStepExecutor from '../src/executors/condition-step-executor'; +import GuidanceStepExecutor from '../src/executors/guidance-step-executor'; import LoadRelatedRecordStepExecutor from '../src/executors/load-related-record-step-executor'; import McpStepExecutor from '../src/executors/mcp-step-executor'; import ReadRecordStepExecutor from '../src/executors/read-record-step-executor'; @@ -115,7 +110,13 @@ function makeStepDefinition(stepType: StepType): StepDefinition { return { type: StepType.Mcp }; } - return { type: stepType as Exclude }; + if (stepType === StepType.Guidance) { + return { type: StepType.Guidance }; + } + + return { + type: stepType as Exclude, + }; } function makePendingStep( @@ -753,6 +754,12 @@ describe('StepExecutorFactory.create — factory', () => { expect(loadTools).toHaveBeenCalledTimes(1); }); + it('dispatches Guidance steps to GuidanceStepExecutor', async () => { + const step = makePendingStep({ stepType: StepType.Guidance }); + const executor = await StepExecutorFactory.create(step, makeContextConfig(), jest.fn()); + expect(executor).toBeInstanceOf(GuidanceStepExecutor); + }); + it('returns an executor with an error outcome for an unknown step type', async () => { const step = { ...makePendingStep(), @@ -1066,207 +1073,51 @@ describe('triggerPoll with options', () => { expect(executeSpy).toHaveBeenCalledTimes(1); }); - it('patches pending data then executes when pendingData is provided', async () => { + it('passes pendingData through to executor via context when provided', async () => { const workflowPort = createMockWorkflowPort(); const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([ - { - type: 'update-record', - stepIndex: 0, - pendingData: { fieldName: 'status', value: 'old' }, - }, - ]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - await runner.triggerPoll('run-1', { pendingData: { userConfirmed: true, value: 'new' } }); - - expect(runStore.saveStepExecution).toHaveBeenCalledWith( - 'run-1', - expect.objectContaining({ - pendingData: { fieldName: 'status', value: 'new', userConfirmed: true }, + const createSpy = jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: jest.fn().mockResolvedValue({ + stepOutcome: { type: 'record', stepId: 'step-1', stepIndex: 0, status: 'success' }, }), - ); - expect(executeSpy).toHaveBeenCalledTimes(1); - }); - - it('throws PendingDataNotFoundError when step is not found', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([]) }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - await expect( - runner.triggerPoll('run-1', { pendingData: { userConfirmed: true } }), - ).rejects.toThrow(PendingDataNotFoundError); - }); - - it('throws PendingDataNotFoundError when step has no pendingData', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([{ type: 'update-record', stepIndex: 0 }]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - await expect( - runner.triggerPoll('run-1', { pendingData: { userConfirmed: true } }), - ).rejects.toThrow(PendingDataNotFoundError); - }); - - it('throws PendingDataNotFoundError when step type has no schema (e.g. condition)', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([{ type: 'condition', stepIndex: 0 }]), }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - await expect( - runner.triggerPoll('run-1', { pendingData: { userConfirmed: true } }), - ).rejects.toThrow(PendingDataNotFoundError); - }); - it('throws InvalidPendingDataError with mapped issues when body fails Zod validation', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([ - { - type: 'update-record', - stepIndex: 0, - pendingData: { fieldName: 'status', value: 'active' }, - }, - ]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - const error = await runner - .triggerPoll('run-1', { pendingData: { userConfirmed: 'yes' } }) - .catch((e: unknown) => e); + runner = new Runner(createRunnerConfig({ workflowPort })); - expect(error).toBeInstanceOf(InvalidPendingDataError); - expect((error as InvalidPendingDataError).issues).toEqual( - expect.arrayContaining([ - expect.objectContaining({ path: ['userConfirmed'], code: expect.any(String) }), - ]), - ); - }); + await runner.triggerPoll('run-1', { pendingData: { userConfirmed: true, value: 'new' } }); - it('throws InvalidPendingDataError when body contains unknown fields', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const runStore = createMockRunStore({ - getStepExecutions: jest - .fn() - .mockResolvedValue([ - { type: 'trigger-action', stepIndex: 0, pendingData: { name: 'send_email' } }, - ]), + expect(createSpy).toHaveBeenCalledWith(step, expect.anything(), expect.any(Function), { + userConfirmed: true, + value: 'new', }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - await expect( - runner.triggerPoll('run-1', { - pendingData: { userConfirmed: true, extra: 'field' }, - }), - ).rejects.toThrow(InvalidPendingDataError); + createSpy.mockRestore(); }); - it('update-record: merges value override into pendingData and calls saveStepExecution', async () => { + it('passes undefined incomingPendingData when no pendingData option is provided', async () => { const workflowPort = createMockWorkflowPort(); const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const existing = { - type: 'update-record' as const, - stepIndex: 0, - pendingData: { fieldName: 'status', value: 'old_value' }, - }; - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([existing]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - await runner.triggerPoll('run-1', { - pendingData: { userConfirmed: true, value: 'new_value' }, - }); - - expect(runStore.saveStepExecution).toHaveBeenCalledWith( - 'run-1', - expect.objectContaining({ - type: 'update-record', - stepIndex: 0, - pendingData: { fieldName: 'status', value: 'new_value', userConfirmed: true }, + const createSpy = jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({ + execute: jest.fn().mockResolvedValue({ + stepOutcome: { type: 'record', stepId: 'step-1', stepIndex: 0, status: 'success' }, }), - ); - }); - - it('load-related-record: merges selectedRecordId override correctly', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 1 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const existing = { - type: 'load-related-record' as const, - stepIndex: 1, - pendingData: { - name: 'order', - displayName: 'Order', - selectedRecordId: [99], - suggestedFields: [], - }, - selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, - }; - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([existing]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); - - await runner.triggerPoll('run-1', { - pendingData: { userConfirmed: true, selectedRecordId: ['42'] }, }); - expect(runStore.saveStepExecution).toHaveBeenCalledWith( - 'run-1', - expect.objectContaining({ - pendingData: expect.objectContaining({ selectedRecordId: ['42'], userConfirmed: true }), - }), - ); - }); - - it('trigger-action: merges userConfirmed:true only, rejects extra field', async () => { - const workflowPort = createMockWorkflowPort(); - const step = makePendingStep({ runId: 'run-1', stepIndex: 0 }); - workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); - const existing = { - type: 'trigger-action' as const, - stepIndex: 0, - pendingData: { name: 'send_email', displayName: 'Send Email' }, - }; - const runStore = createMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([existing]), - }); - runner = new Runner(createRunnerConfig({ workflowPort, runStore })); + runner = new Runner(createRunnerConfig({ workflowPort })); - await runner.triggerPoll('run-1', { pendingData: { userConfirmed: true } }); + await runner.triggerPoll('run-1'); - expect(runStore.saveStepExecution).toHaveBeenCalledWith( - 'run-1', - expect.objectContaining({ - pendingData: expect.objectContaining({ userConfirmed: true }), - }), + expect(createSpy).toHaveBeenCalledWith( + step, + expect.anything(), + expect.any(Function), + undefined, ); - await expect( - runner.triggerPoll('run-1', { - pendingData: { userConfirmed: true, name: 'override' }, - }), - ).rejects.toThrow(InvalidPendingDataError); + createSpy.mockRestore(); }); }); diff --git a/packages/workflow-executor/test/types/step-outcome.test.ts b/packages/workflow-executor/test/types/step-outcome.test.ts index 47354209fd..7046a79e0c 100644 --- a/packages/workflow-executor/test/types/step-outcome.test.ts +++ b/packages/workflow-executor/test/types/step-outcome.test.ts @@ -26,6 +26,10 @@ describe('stepTypeToOutcomeType', () => { expect(stepTypeToOutcomeType(StepType.LoadRelatedRecord)).toBe('record'); }); + it('maps Guidance to guidance', () => { + expect(stepTypeToOutcomeType(StepType.Guidance)).toBe('guidance'); + }); + it('falls through to record for an unknown future step type', () => { expect(stepTypeToOutcomeType('future-step-type' as StepType)).toBe('record'); });