diff --git a/packages/agent/src/routes/index.ts b/packages/agent/src/routes/index.ts index 24476bee42..6d626180e2 100644 --- a/packages/agent/src/routes/index.ts +++ b/packages/agent/src/routes/index.ts @@ -31,6 +31,7 @@ import ScopeInvalidation from './security/scope-invalidation'; import ErrorHandling from './system/error-handling'; import HealthCheck from './system/healthcheck'; import Logger from './system/logger'; +import WorkflowExecutorProxyRoute from './workflow/workflow-executor-proxy'; export const ROOT_ROUTES_CTOR = [ Authentication, @@ -172,6 +173,12 @@ function getAiRoutes(options: Options, services: Services, aiRouter: AiRouter | return [new AiProxyRoute(services, options, aiRouter)]; } +function getWorkflowExecutorRoutes(options: Options, services: Services): BaseRoute[] { + if (!options.workflowExecutorUrl) return []; + + return [new WorkflowExecutorProxyRoute(services, options)]; +} + export default function makeRoutes( dataSource: DataSource, options: Options, @@ -187,6 +194,7 @@ export default function makeRoutes( ...getRelatedRoutes(dataSource, options, services), ...getActionRoutes(dataSource, options, services), ...getAiRoutes(options, services, aiRouter), + ...getWorkflowExecutorRoutes(options, services), ]; // Ensure routes and middlewares are loaded in the right order. diff --git a/packages/agent/src/routes/workflow/workflow-executor-proxy.ts b/packages/agent/src/routes/workflow/workflow-executor-proxy.ts new file mode 100644 index 0000000000..33e6294045 --- /dev/null +++ b/packages/agent/src/routes/workflow/workflow-executor-proxy.ts @@ -0,0 +1,87 @@ +import type { ForestAdminHttpDriverServices } from '../../services'; +import type { AgentOptionsWithDefaults } from '../../types'; +import type KoaRouter from '@koa/router'; +import type { Context } from 'koa'; + +import { request as httpRequest } from 'http'; +import { request as httpsRequest } from 'https'; + +import { HttpCode, RouteType } from '../../types'; +import BaseRoute from '../base-route'; + +export default class WorkflowExecutorProxyRoute extends BaseRoute { + readonly type = RouteType.PrivateRoute; + private readonly executorUrl: URL; + + constructor(services: ForestAdminHttpDriverServices, options: AgentOptionsWithDefaults) { + super(services, options); + // Remove trailing slash for clean URL joining + this.executorUrl = new URL(options.workflowExecutorUrl.replace(/\/+$/, '')); + } + + private static readonly AGENT_PREFIX = '/_internal/workflow-executions'; + private static readonly EXECUTOR_PREFIX = '/runs'; + + setupRoutes(router: KoaRouter): void { + router.get('/_internal/workflow-executions/:runId', this.handleProxy.bind(this)); + router.post('/_internal/workflow-executions/:runId/trigger', this.handleProxy.bind(this)); + router.patch( + '/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data', + this.handleProxy.bind(this), + ); + } + + private async handleProxy(context: Context): Promise { + // Rewrite /_internal/workflow-executions/... → /runs/... + const executorPath = context.path.replace( + WorkflowExecutorProxyRoute.AGENT_PREFIX, + WorkflowExecutorProxyRoute.EXECUTOR_PREFIX, + ); + const targetUrl = new URL(executorPath, this.executorUrl); + + const response = await this.forwardRequest(context.method, targetUrl, context.request.body); + + context.response.status = response.status; + context.response.body = response.body; + } + + private forwardRequest( + method: string, + url: URL, + body?: unknown, + ): Promise<{ status: number; body: unknown }> { + const requestFn = url.protocol === 'https:' ? httpsRequest : httpRequest; + + return new Promise((resolve, reject) => { + const req = requestFn( + url, + { method, headers: { 'Content-Type': 'application/json' } }, + res => { + const chunks: Uint8Array[] = []; + res.on('data', chunk => chunks.push(chunk)); + res.on('end', () => { + const raw = Buffer.concat(chunks).toString('utf-8'); + let parsed: unknown; + + try { + parsed = JSON.parse(raw); + } catch { + parsed = raw; + } + + resolve({ status: res.statusCode ?? HttpCode.InternalServerError, body: parsed }); + }); + res.on('error', reject); + }, + ); + + req.on('error', reject); + + if (body && method !== 'GET') { + req.write(JSON.stringify(body)); + } + + req.end(); + }); + } +} diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index d90d83b084..53785b8c25 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -45,6 +45,13 @@ export type AgentOptions = { */ ignoreMissingSchemaElementErrors?: boolean; useUnsafeActionEndpoint?: boolean; + /** + * Base URL of the workflow executor to proxy requests to. + * When set, the agent exposes routes at `/_internal/workflow-executions/` + * that forward to the executor, benefiting from the agent's authentication layer. + * @example 'http://localhost:4001' + */ + workflowExecutorUrl?: string | null; }; export type AgentOptionsWithDefaults = Readonly>; diff --git a/packages/agent/src/utils/options-validator.ts b/packages/agent/src/utils/options-validator.ts index 3178192798..a6fa72548b 100644 --- a/packages/agent/src/utils/options-validator.ts +++ b/packages/agent/src/utils/options-validator.ts @@ -38,6 +38,7 @@ export default class OptionsValidator { copyOptions.loggerLevel = copyOptions.loggerLevel || 'Info'; copyOptions.skipSchemaUpdate = copyOptions.skipSchemaUpdate || false; copyOptions.instantCacheRefresh = copyOptions.instantCacheRefresh ?? true; + copyOptions.workflowExecutorUrl = copyOptions.workflowExecutorUrl ?? null; copyOptions.maxBodySize = copyOptions.maxBodySize || '50mb'; copyOptions.bodyParserOptions = copyOptions.bodyParserOptions || { jsonLimit: '50mb', diff --git a/packages/agent/test/__factories__/forest-admin-http-driver-options.ts b/packages/agent/test/__factories__/forest-admin-http-driver-options.ts index bf64613f23..5189d2b9a4 100644 --- a/packages/agent/test/__factories__/forest-admin-http-driver-options.ts +++ b/packages/agent/test/__factories__/forest-admin-http-driver-options.ts @@ -29,4 +29,5 @@ export default Factory.define(() => ({ }, ignoreMissingSchemaElementErrors: false, useUnsafeActionEndpoint: false, + workflowExecutorUrl: null, })); diff --git a/packages/agent/test/__factories__/router.ts b/packages/agent/test/__factories__/router.ts index 226a1bf2ab..78cb0e55ba 100644 --- a/packages/agent/test/__factories__/router.ts +++ b/packages/agent/test/__factories__/router.ts @@ -7,6 +7,7 @@ export class RouterFactory extends Factory { router.get = jest.fn(); router.delete = jest.fn(); router.use = jest.fn(); + router.patch = jest.fn(); router.post = jest.fn(); router.put = jest.fn(); router.all = jest.fn(); diff --git a/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts b/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts new file mode 100644 index 0000000000..0162373d68 --- /dev/null +++ b/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts @@ -0,0 +1,199 @@ +import { createMockContext } from '@shopify/jest-koa-mocks'; +import http from 'http'; + +import WorkflowExecutorProxyRoute from '../../../src/routes/workflow/workflow-executor-proxy'; +import { RouteType } from '../../../src/types'; +import * as factories from '../../__factories__'; + +describe('WorkflowExecutorProxyRoute', () => { + const services = factories.forestAdminHttpDriverServices.build(); + const router = factories.router.mockAllMethods().build(); + + let executorServer: http.Server; + let executorPort: number; + + // Start a real HTTP server to act as the workflow executor + beforeAll(async () => { + executorServer = http.createServer((req, res) => { + const chunks: Uint8Array[] = []; + req.on('data', chunk => chunks.push(chunk)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + + res.setHeader('Content-Type', 'application/json'); + + if (req.url?.includes('not-found')) { + res.writeHead(404); + res.end(JSON.stringify({ error: 'Run not found or unavailable' })); + } else if (req.method === 'GET' && req.url?.match(/^\/runs\/[\w-]+$/)) { + res.writeHead(200); + res.end(JSON.stringify({ steps: [{ stepId: 's1', status: 'success' }] })); + } else if (req.method === 'POST' && req.url?.match(/^\/runs\/[\w-]+\/trigger$/)) { + res.writeHead(200); + res.end(JSON.stringify({ triggered: true })); + } else if ( + req.method === 'PATCH' && + req.url?.match(/^\/runs\/[\w-]+\/steps\/\d+\/pending-data$/) + ) { + const parsed = body ? JSON.parse(body) : {}; + res.writeHead(200); + res.end(JSON.stringify({ updated: true, received: parsed })); + } else { + res.writeHead(404); + res.end(JSON.stringify({ error: 'Not found' })); + } + }); + }); + + await new Promise((resolve, reject) => { + executorServer.listen(0, () => { + executorPort = (executorServer.address() as { port: number }).port; + resolve(); + }); + executorServer.on('error', reject); + }); + }); + + afterAll(async () => { + await new Promise((resolve, reject) => { + executorServer.close(err => (err ? reject(err) : resolve())); + }); + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + const buildOptions = (url: string) => + factories.forestAdminHttpDriverOptions.build({ workflowExecutorUrl: url }); + + describe('constructor', () => { + test('should have RouteType.PrivateRoute', () => { + const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001')); + + expect(route.type).toBe(RouteType.PrivateRoute); + }); + }); + + describe('setupRoutes', () => { + test('should register GET, POST and PATCH routes', () => { + const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001')); + route.setupRoutes(router); + + expect(router.get).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId', + expect.any(Function), + ); + expect(router.post).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId/trigger', + expect.any(Function), + ); + expect(router.patch).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data', + expect.any(Function), + ); + }); + }); + + describe('handleProxy', () => { + test('should forward GET /runs/:runId and return executor response', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'run-123' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-123', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ + steps: [{ stepId: 's1', status: 'success' }], + }); + }); + + test('should forward POST /runs/:runId/trigger and return executor response', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + method: 'POST', + customProperties: { params: { runId: 'run-456' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-456/trigger', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ triggered: true }); + }); + + test('should forward error status from executor', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'not-found' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/not-found', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(404); + expect(context.response.body).toEqual({ error: 'Run not found or unavailable' }); + }); + + test('should forward PATCH pending-data and pass request body', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + method: 'PATCH', + customProperties: { params: { runId: 'run-789', stepIndex: '2' } }, + requestBody: { fieldValues: { name: 'updated' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-789/steps/2/pending-data', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ + updated: true, + received: { fieldValues: { name: 'updated' } }, + }); + }); + + test('should reject when executor is unreachable', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions('http://localhost:1'), // port that should be unreachable + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'run-789' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-789', + }); + + await expect((route as any).handleProxy(context)).rejects.toThrow(); + }); + }); +}); diff --git a/packages/ai-proxy/package.json b/packages/ai-proxy/package.json index 4fa5da943c..b7f1adf722 100644 --- a/packages/ai-proxy/package.json +++ b/packages/ai-proxy/package.json @@ -16,7 +16,7 @@ "@forestadmin/datasource-toolkit": "1.53.1", "@langchain/anthropic": "1.3.17", "@langchain/community": "^1.1.19", - "@langchain/core": "1.1.15", + "@langchain/core": "1.1.39", "@langchain/langgraph": "^1.1.0", "@langchain/mcp-adapters": "1.1.1", "@langchain/openai": "1.2.5", diff --git a/packages/workflow-executor/example/README.md b/packages/workflow-executor/example/README.md index 42e99c774e..f430981d9d 100644 --- a/packages/workflow-executor/example/README.md +++ b/packages/workflow-executor/example/README.md @@ -1,15 +1,76 @@ # Workflow Executor — Example -Minimal setup to run a workflow executor backed by PostgreSQL. +Two ways to run the executor: **mock mode** (no external services) or **production mode** (real PostgreSQL, AI provider, and Forest Admin agent). -## Prerequisites +--- + +## Mock Mode + +Run the executor with in-memory stores and mock ports. No database, no AI provider, no running agent required. + +### Quick start + +```bash +cd packages/workflow-executor +npx tsx example/mock.ts +``` + +The mock executor will: +- Poll for pending steps every 3 seconds +- Walk through 7 step types: condition, read-record, update-record, trigger-action, load-related-record, mcp, guidance +- Log every port call (agent, orchestrator, AI) to the console + +### Endpoints + +| Method | Path | Description | +|--------|-------------------------|----------------------------------| +| GET | `/health` | Returns `{ "state": "running" }` | +| GET | `/runs/run-1` | Current run state | +| POST | `/runs/run-1/trigger` | Trigger the next pending step | + +All endpoints except `/health` require a JWT bearer token: + +```bash +# Generate a token +TOKEN=$(node -e "console.log(require('jsonwebtoken').sign({id:1},'mock-auth-secret'))") + +# Health check (no auth) +curl http://localhost:3400/health + +# Get run state +curl -H "Authorization: Bearer $TOKEN" http://localhost:3400/runs/run-1 + +# Trigger pending step (for steps that return awaiting-input) +curl -X POST -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"pendingData":{"userConfirmed":true}}' \ + http://localhost:3400/runs/run-1/trigger +``` + +### Execution flow + +Steps 0 (condition) and 1 (read-record) execute automatically on the first poll cycle. Step 2 (update-record) returns `awaiting-input` because `automaticExecution` is not set -- use the `/trigger` endpoint to confirm it. Steps 3 (trigger-action) and 5 (mcp) also pause for confirmation. Step 4 (load-related-record) has `automaticExecution: true` and runs automatically. Step 6 (guidance) requires `pendingData.userInput` via the trigger endpoint. + +### Modifying the scenario + +All mock data lives in `example/scenario.ts`: +- `SCHEMAS` -- collection schemas returned by the mock workflow port +- `RECORDS` -- record data returned by the mock agent port +- `STEPS` -- the step sequence the mock orchestrator dispatches +- `AI_RESPONSES` -- canned AI tool calls keyed by step index + +--- + +## Production Mode + +Full setup backed by PostgreSQL, a real AI provider, and a running Forest Admin agent. + +### Prerequisites - Docker - Node.js 18+ - A running Forest Admin agent (the executor proxies record operations to it) -## Quick start - ### 1. Start PostgreSQL ```bash @@ -25,9 +86,9 @@ cp .env.example .env Fill in your secrets in `.env`: -- `FOREST_ENV_SECRET` / `FOREST_AUTH_SECRET` — from your Forest Admin project settings -- `AGENT_URL` — URL of your running Forest Admin agent -- `AI_API_KEY` — your AI provider API key +- `FOREST_ENV_SECRET` / `FOREST_AUTH_SECRET` -- from your Forest Admin project settings +- `AGENT_URL` -- URL of your running Forest Admin agent +- `AI_API_KEY` -- your AI provider API key ### 3. Run the executor @@ -47,7 +108,7 @@ The executor will: - Poll the Forest Admin orchestrator for pending steps every 5 seconds - Execute steps locally and report results back -## Teardown +### Teardown ```bash docker compose down -v diff --git a/packages/workflow-executor/example/mock-agent-port.ts b/packages/workflow-executor/example/mock-agent-port.ts new file mode 100644 index 0000000000..075c9202d0 --- /dev/null +++ b/packages/workflow-executor/example/mock-agent-port.ts @@ -0,0 +1,58 @@ +import type { + AgentPort, + ExecuteActionQuery, + GetRecordQuery, + GetRelatedDataQuery, + UpdateRecordQuery, +} from '../src/ports/agent-port'; +import type { StepUser } from '../src/types/execution'; +import type { RecordData } from '../src/types/record'; + +import { RECORDS } from './scenario'; + +export default class MockAgentPort implements AgentPort { + // eslint-disable-next-line no-console, @typescript-eslint/no-unused-vars + async getRecord(query: GetRecordQuery, _user: StepUser): Promise { + const record = query.collection === 'orders' ? RECORDS.order : RECORDS.customer; + + // eslint-disable-next-line no-console + console.log( + ` [agent] getRecord(${query.collection}, #${query.id}) -> fields: ${ + query.fields?.join(', ') || 'all' + }`, + ); + + return record; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async updateRecord(query: UpdateRecordQuery, _user: StepUser): Promise { + const record = query.collection === 'orders' ? RECORDS.order : RECORDS.customer; + const updated = { ...record, values: { ...record.values, ...query.values } }; + + // eslint-disable-next-line no-console + console.log( + ` [agent] updateRecord(${query.collection}, #${query.id}) -> ${JSON.stringify( + query.values, + )}`, + ); + + return updated; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async getRelatedData(query: GetRelatedDataQuery, _user: StepUser): Promise { + // eslint-disable-next-line no-console + console.log(` [agent] getRelatedData(${query.collection}, #${query.id}, ${query.relation})`); + + return [RECORDS.order]; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async executeAction(query: ExecuteActionQuery, _user: StepUser): Promise { + // eslint-disable-next-line no-console + console.log(` [agent] executeAction(${query.collection}, ${query.action})`); + + return { success: true, message: `Action '${query.action}' executed successfully` }; + } +} diff --git a/packages/workflow-executor/example/mock-ai-model-port.ts b/packages/workflow-executor/example/mock-ai-model-port.ts new file mode 100644 index 0000000000..f7f5b5260d --- /dev/null +++ b/packages/workflow-executor/example/mock-ai-model-port.ts @@ -0,0 +1,84 @@ +import type { AiModelPort } from '../src/ports/ai-model-port'; +import type { McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; + +// eslint-disable-next-line import/no-extraneous-dependencies +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; + +import { AI_RESPONSES } from './scenario'; + +function createMockModel(stepIndex: number): BaseChatModel { + const response = AI_RESPONSES[stepIndex]; + + const invoke = async () => { + if (!response) return { tool_calls: undefined }; + + return { + tool_calls: [{ name: response.toolName, args: response.args, id: `call-${stepIndex}` }], + }; + }; + + // Minimal mock that satisfies BaseChatModel usage in base-step-executor: + // model.bindTools(tools, opts) returns model-like object with invoke() + const model = { + invoke, + bindTools() { + return this; + }, + }; + + return model as unknown as BaseChatModel; +} + +function createMockRemoteTool(): RemoteTool { + const tool = new DynamicStructuredTool({ + name: 'mock_search', + description: 'Search for information about a topic', + schema: z.object({ query: z.string().describe('Search query') }), + func: async (input: { query: string }) => { + // eslint-disable-next-line no-console + console.log(` [mcp] mock_search("${input.query}")`); + + return JSON.stringify({ + results: [{ title: 'Customer Profile', snippet: 'VIP customer since 2024' }], + }); + }, + }); + + return { + base: tool, + sourceId: 'mock-server', + sourceType: 'mcp-server', + get sanitizedName() { + return tool.name; + }, + } as unknown as RemoteTool; +} + +/** + * Mock AI model port that returns deterministic responses from AI_RESPONSES. + * Reads the current step index from an external getter (linked to MockWorkflowPort) + * so re-polling the same step always returns the same AI response. + */ +export default class MockAiModelPort implements AiModelPort { + private readonly getCurrentStepIndex: () => number; + + constructor(getCurrentStepIndex: () => number) { + this.getCurrentStepIndex = getCurrentStepIndex; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + getModel(_aiConfigName?: string): BaseChatModel { + return createMockModel(this.getCurrentStepIndex()); + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async loadRemoteTools(_config: McpConfiguration): Promise { + return [createMockRemoteTool()]; + } + + async closeConnections(): Promise { + // no-op + } +} diff --git a/packages/workflow-executor/example/mock-workflow-port.ts b/packages/workflow-executor/example/mock-workflow-port.ts new file mode 100644 index 0000000000..8c7e800a80 --- /dev/null +++ b/packages/workflow-executor/example/mock-workflow-port.ts @@ -0,0 +1,74 @@ +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { PendingStepExecution, Step, StepUser } from '../src/types/execution'; +import type { StepOutcome } from '../src/types/step-outcome'; + +import { SCHEMAS, STEPS } from './scenario'; + +export default class MockWorkflowPort implements WorkflowPort { + currentStepIndex = 0; + private completedSteps: Step[] = []; + + async getPendingStepExecutions(): Promise { + if (this.currentStepIndex >= STEPS.length) return []; + + return [this.getCurrentStep()]; + } + + async getPendingStepExecutionsForRun(runId: string): Promise { + if (runId !== 'run-1' || this.currentStepIndex >= STEPS.length) return null; + + return this.getCurrentStep(); + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async updateStepExecution(_runId: string, outcome: StepOutcome): Promise { + const step = STEPS[this.currentStepIndex]; + + // eslint-disable-next-line no-console + console.log( + ` [orchestrator] Step ${outcome.stepIndex} (${step?.stepDefinition.type}): ${outcome.status}`, + ); + + if (outcome.status === 'success') { + this.completedSteps.push({ + stepDefinition: step.stepDefinition, + stepOutcome: outcome, + }); + this.currentStepIndex += 1; + + if (this.currentStepIndex >= STEPS.length) { + // eslint-disable-next-line no-console + console.log('\n Workflow completed!\n'); + } + } else if (outcome.status === 'awaiting-input') { + // eslint-disable-next-line no-console + console.log(' Waiting for frontend trigger...'); + } else if (outcome.status === 'error') { + // eslint-disable-next-line no-console + console.log(` Error: ${outcome.error}`); + } + } + + async getCollectionSchema(collectionName: string) { + const schema = SCHEMAS[collectionName]; + if (!schema) throw new Error(`Unknown collection: ${collectionName}`); + + return schema; + } + + async getMcpServerConfigs() { + // Return a dummy config so the Runner calls loadRemoteTools on the mock AiModelPort + return [{ configs: { 'mock-server': { type: 'http' as const, url: 'http://mock' } } }]; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async hasRunAccess(_runId: string, _user: StepUser) { + return true; + } + + private getCurrentStep(): PendingStepExecution { + const step = STEPS[this.currentStepIndex]; + + return { ...step, previousSteps: [...this.completedSteps] }; + } +} diff --git a/packages/workflow-executor/example/mock.ts b/packages/workflow-executor/example/mock.ts new file mode 100644 index 0000000000..13b77cb2ed --- /dev/null +++ b/packages/workflow-executor/example/mock.ts @@ -0,0 +1,60 @@ +/* eslint-disable no-console */ +import MockAgentPort from './mock-agent-port'; +import MockAiModelPort from './mock-ai-model-port'; +import MockWorkflowPort from './mock-workflow-port'; +import ExecutorHttpServer from '../src/http/executor-http-server'; +import Runner from '../src/runner'; +import SchemaCache from '../src/schema-cache'; +import InMemoryStore from '../src/stores/in-memory-store'; + +const PORT = Number(process.env.PORT) || 3400; +const AUTH_SECRET = 'mock-auth-secret'; + +async function main() { + const workflowPort = new MockWorkflowPort(); + const agentPort = new MockAgentPort(); + const aiModelPort = new MockAiModelPort(() => workflowPort.currentStepIndex); + + const runner = new Runner({ + workflowPort, + agentPort, + aiModelPort, + runStore: new InMemoryStore(), + schemaCache: new SchemaCache(), + pollingIntervalMs: 3000, + envSecret: 'a'.repeat(64), + authSecret: AUTH_SECRET, + }); + + const server = new ExecutorHttpServer({ + port: PORT, + runner, + authSecret: AUTH_SECRET, + workflowPort, + }); + + await runner.start(); + await server.start(); + + console.log(`\n Mock Workflow Executor running on http://localhost:${PORT}`); + console.log(` Auth secret: ${AUTH_SECRET}`); + console.log( + ` Generate JWT: node -e "console.log(require('jsonwebtoken').sign({id:1},'${AUTH_SECRET}'))"`, + ); + console.log(`\n Endpoints:`); + console.log(` GET /health`); + console.log(` GET /runs/run-1`); + console.log(` POST /runs/run-1/trigger\n`); + + const shutdown = async () => { + console.log('\nShutting down...'); + await runner.stop(); + await server.stop(); + process.exit(0); + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); +} + +main().catch(console.error); diff --git a/packages/workflow-executor/example/scenario.ts b/packages/workflow-executor/example/scenario.ts new file mode 100644 index 0000000000..149c50016c --- /dev/null +++ b/packages/workflow-executor/example/scenario.ts @@ -0,0 +1,183 @@ +import type { + CollectionSchema, + PendingStepExecution, + RecordData, + RecordRef, + StepUser, +} from '../src'; + +import { StepType } from '../src'; + +// -- Schemas -- +export const SCHEMAS: Record = { + customers: { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'id', displayName: 'Id', isRelationship: false }, + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { fieldName: 'name', displayName: 'Full Name', isRelationship: false }, + { fieldName: 'status', displayName: 'Status', isRelationship: false }, + { + fieldName: 'order', + displayName: 'Order', + isRelationship: true, + relationType: 'BelongsTo', + relatedCollectionName: 'orders', + }, + ], + actions: [ + { + name: 'send-welcome-email', + displayName: 'Send Welcome Email', + endpoint: '/forest/actions/send-welcome-email', + }, + ], + }, + orders: { + collectionName: 'orders', + collectionDisplayName: 'Orders', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'id', displayName: 'Id', isRelationship: false }, + { fieldName: 'total', displayName: 'Total', isRelationship: false }, + { fieldName: 'date', displayName: 'Date', isRelationship: false }, + ], + actions: [], + }, +}; + +// -- Records -- +export const RECORDS: Record = { + customer: { + collectionName: 'customers', + recordId: [42], + values: { id: 42, email: 'john@acme.com', name: 'John Doe', status: 'pending' }, + }, + order: { + collectionName: 'orders', + recordId: [99], + values: { id: 99, total: 150, date: '2026-03-15' }, + }, +}; + +// -- User -- +export const USER: StepUser = { + id: 1, + email: 'admin@acme.com', + firstName: 'Admin', + lastName: 'User', + team: 'Operations', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, +}; + +const BASE_REF: RecordRef = { collectionName: 'customers', recordId: [42], stepIndex: 0 }; + +// -- Steps (7 types) -- +export const STEPS: Omit[] = [ + { + + runId: 'run-1', + stepId: 'step-0', + stepIndex: 0, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { + type: StepType.Condition, + prompt: 'Is this a VIP customer?', + options: ['Yes', 'No'], + }, + }, + { + + runId: 'run-1', + stepId: 'step-1', + stepIndex: 1, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { type: StepType.ReadRecord, prompt: 'Read the customer email and name' }, + }, + { + + runId: 'run-1', + stepId: 'step-2', + stepIndex: 2, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { type: StepType.UpdateRecord, prompt: 'Set the customer status to active' }, + }, + { + + runId: 'run-1', + stepId: 'step-3', + stepIndex: 3, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { type: StepType.TriggerAction, prompt: 'Send the welcome email' }, + }, + { + + runId: 'run-1', + stepId: 'step-4', + stepIndex: 4, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { + type: StepType.LoadRelatedRecord, + prompt: 'Load the customer order', + automaticExecution: true, + }, + }, + { + + runId: 'run-1', + stepId: 'step-5', + stepIndex: 5, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { + type: StepType.Mcp, + prompt: 'Search for related info about this customer', + }, + }, + { + + runId: 'run-1', + stepId: 'step-6', + stepIndex: 6, + baseRecordRef: BASE_REF, + user: USER, + stepDefinition: { + type: StepType.Guidance, + prompt: 'What notes would you like to add about this customer?', + }, + }, +]; + +// -- AI responses (per step index) -- +// Tool names must match what the executors expect +export const AI_RESPONSES: Record }> = { + 0: { + toolName: 'select-option', + args: { option: 'Yes', reasoning: 'Customer has a premium account' }, + }, + 1: { toolName: 'read-selected-record-fields', args: { fieldNames: ['Email', 'Full Name'] } }, + 2: { + toolName: 'update-record-field', + args: { fieldName: 'Status', value: 'active', reasoning: 'Onboarding activation' }, + }, + 3: { + toolName: 'select-action', + args: { actionName: 'Send Welcome Email', reasoning: 'Standard onboarding' }, + }, + 4: { + toolName: 'select-relation', + args: { relationName: 'Order', reasoning: 'Load customer order' }, + }, + 5: { toolName: 'mock_search', args: { query: 'John Doe customer info' } }, + // Step 6 (guidance) -- no AI, front sends userInput directly +}; diff --git a/yarn.lock b/yarn.lock index f485bd5167..a42d0e99fd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2308,20 +2308,21 @@ uuid "^10.0.0" zod "^3.25.76 || ^4" -"@langchain/core@1.1.15": - version "1.1.15" - resolved "https://registry.yarnpkg.com/@langchain/core/-/core-1.1.15.tgz#31a933f4b445101ad1aa441b4f29b74e5994986e" - integrity sha512-b8RN5DkWAmDAlMu/UpTZEluYwCLpm63PPWniRKlE8ie3KkkE7IuMQ38pf4kV1iaiI+d99BEQa2vafQHfCujsRA== +"@langchain/core@1.1.39": + version "1.1.39" + resolved "https://registry.yarnpkg.com/@langchain/core/-/core-1.1.39.tgz#c84dec834a61c4efb328227548a91b2464ad6091" + integrity sha512-DP9c7TREy6iA7HnywstmUAsNyJNYTFpRg2yBfQ+6H0l1HnvQzei9GsQ36GeOLxgRaD3vm9K8urCcawSC7yQpCw== dependencies: "@cfworker/json-schema" "^4.0.2" + "@standard-schema/spec" "^1.1.0" ansi-styles "^5.0.0" camelcase "6" decamelize "1.2.0" js-tiktoken "^1.0.12" - langsmith ">=0.4.0 <1.0.0" + langsmith ">=0.5.0 <1.0.0" mustache "^4.2.0" p-queue "^6.6.2" - uuid "^10.0.0" + uuid "^11.1.0" zod "^3.25.76 || ^4" "@langchain/langgraph-checkpoint@^1.0.1": @@ -4243,7 +4244,7 @@ dependencies: tslib "^2.6.2" -"@standard-schema/spec@1.1.0": +"@standard-schema/spec@1.1.0", "@standard-schema/spec@^1.1.0": version "1.1.0" resolved "https://registry.yarnpkg.com/@standard-schema/spec/-/spec-1.1.0.tgz#a79b55dbaf8604812f52d140b2c9ab41bc150bb8" integrity sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w== @@ -11438,6 +11439,17 @@ koa@^3.0.1: semver "^7.6.3" uuid "^10.0.0" +"langsmith@>=0.5.0 <1.0.0": + version "0.5.16" + resolved "https://registry.yarnpkg.com/langsmith/-/langsmith-0.5.16.tgz#11285033f4c4da4e1dc9c8f1e4765cded0123b8f" + integrity sha512-nSsSnTo3gjg1dnb48vb8i582zyjvtPbn+EpR6P1pNELb+4Hb4R3nt7LDy+Tl1ltw73vPGfJQtUWOl28irI1b5w== + dependencies: + chalk "^5.6.2" + console-table-printer "^2.12.1" + p-queue "^6.6.2" + semver "^7.6.3" + uuid "^10.0.0" + lerna@^8.2.3: version "8.2.3" resolved "https://registry.yarnpkg.com/lerna/-/lerna-8.2.3.tgz#0a9c07eda4cfac84a480b3e66915189ccfb5bd2c" @@ -17507,6 +17519,11 @@ uuid@^10.0.0: resolved "https://registry.yarnpkg.com/uuid/-/uuid-10.0.0.tgz#5a95aa454e6e002725c79055fd42aaba30ca6294" integrity sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ== +uuid@^11.1.0: + version "11.1.0" + resolved "https://registry.yarnpkg.com/uuid/-/uuid-11.1.0.tgz#9549028be1753bb934fc96e2bca09bb4105ae912" + integrity sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A== + uuid@^13.0.0: version "13.0.0" resolved "https://registry.yarnpkg.com/uuid/-/uuid-13.0.0.tgz#263dc341b19b4d755eb8fe36b78d95a6b65707e8"