diff --git a/packages/workflow-executor/package.json b/packages/workflow-executor/package.json index 11401368ee..2b3c5e518b 100644 --- a/packages/workflow-executor/package.json +++ b/packages/workflow-executor/package.json @@ -25,6 +25,7 @@ "dependencies": { "@forestadmin/agent-client": "1.4.18", "@forestadmin/ai-proxy": "1.7.2", + "@langchain/openai": "1.2.5", "@forestadmin/forestadmin-client": "1.38.2", "@koa/bodyparser": "^6.1.0", "@koa/router": "^13.1.0", diff --git a/packages/workflow-executor/src/adapters/ai-client-adapter.ts b/packages/workflow-executor/src/adapters/ai-client-adapter.ts new file mode 100644 index 0000000000..c88a9d3218 --- /dev/null +++ b/packages/workflow-executor/src/adapters/ai-client-adapter.ts @@ -0,0 +1,27 @@ +import type { AiModelPort } from '../ports/ai-model-port'; +import type { PendingStepExecution } from '../types/execution'; +import type { AiConfiguration, McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; + +import { AiClient } from '@forestadmin/ai-proxy'; + +export default class AiClientAdapter implements AiModelPort { + private readonly aiClient: AiClient; + + constructor(aiConfigurations: AiConfiguration[]) { + const withRetries = aiConfigurations.map(c => ({ maxRetries: 2, ...c })); + this.aiClient = new AiClient({ aiConfigurations: withRetries as AiConfiguration[] }); + } + + getModel(step: PendingStepExecution): BaseChatModel { + return this.aiClient.getModel(step.stepDefinition.aiConfigName); + } + + loadRemoteTools(config: McpConfiguration): Promise { + return this.aiClient.loadRemoteTools(config); + } + + closeConnections(): Promise { + return this.aiClient.closeConnections(); + } +} diff --git a/packages/workflow-executor/src/adapters/server-ai-adapter.ts b/packages/workflow-executor/src/adapters/server-ai-adapter.ts new file mode 100644 index 0000000000..f4f0b729bc --- /dev/null +++ b/packages/workflow-executor/src/adapters/server-ai-adapter.ts @@ -0,0 +1,54 @@ +import type { AiModelPort } from '../ports/ai-model-port'; +import type { PendingStepExecution } from '../types/execution'; +import type { McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; + +import { AiClient } from '@forestadmin/ai-proxy'; +import { ChatOpenAI } from '@langchain/openai'; +import jsonwebtoken from 'jsonwebtoken'; + +export interface ServerAiAdapterOptions { + forestServerUrl: string; + envSecret: string; +} + +export default class ServerAiAdapter implements AiModelPort { + private readonly forestServerUrl: string; + private readonly envSecret: string; + private readonly aiClient: AiClient; + + constructor(options: ServerAiAdapterOptions) { + this.forestServerUrl = options.forestServerUrl; + this.envSecret = options.envSecret; + this.aiClient = new AiClient(); + } + + getModel(step: PendingStepExecution): BaseChatModel { + const jwt = jsonwebtoken.sign( + { envId: step.envId, userId: step.user.id, runId: step.runId, stepIndex: step.stepIndex }, + this.envSecret, + { expiresIn: '1h' }, + ); + + const aiProxyUrl = `${this.forestServerUrl}/liana/v1/ai-proxy`; + + return new ChatOpenAI({ + // Model has no effect — the server uses its own configured model. + // Set here only because ChatOpenAI requires it. + model: 'gpt-4.1', + maxRetries: 2, + configuration: { + apiKey: jwt, + fetch: (_url: RequestInfo | URL, init?: RequestInit) => fetch(aiProxyUrl, init), + }, + }); + } + + loadRemoteTools(config: McpConfiguration): Promise { + return this.aiClient.loadRemoteTools(config); + } + + closeConnections(): Promise { + return this.aiClient.closeConnections(); + } +} diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index f09c01b9d8..3a6d96d107 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -3,12 +3,13 @@ import type { RunnerState } from './runner'; import type { AiConfiguration } from '@forestadmin/ai-proxy'; import type { Options as SequelizeOptions } from 'sequelize'; -import { AiClient } from '@forestadmin/ai-proxy'; import { Sequelize } from 'sequelize'; import AgentClientAgentPort from './adapters/agent-client-agent-port'; +import AiClientAdapter from './adapters/ai-client-adapter'; import ConsoleLogger from './adapters/console-logger'; import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; +import ServerAiAdapter from './adapters/server-ai-adapter'; import ExecutorHttpServer from './http/executor-http-server'; import Runner from './runner'; import SchemaCache from './schema-cache'; @@ -31,7 +32,7 @@ export interface ExecutorOptions { agentUrl: string; httpPort: number; forestServerUrl?: string; - aiConfigurations: AiConfiguration[]; + aiConfigurations?: AiConfiguration[]; pollingIntervalMs?: number; logger?: Logger; stopTimeoutMs?: number; @@ -49,9 +50,9 @@ function buildCommonDependencies(options: ExecutorOptions) { forestServerUrl, }); - const aiClient = new AiClient({ - aiConfigurations: options.aiConfigurations, - }); + const aiModelPort = options.aiConfigurations?.length + ? new AiClientAdapter(options.aiConfigurations) + : new ServerAiAdapter({ forestServerUrl, envSecret: options.envSecret }); const schemaCache = new SchemaCache(); @@ -65,7 +66,7 @@ function buildCommonDependencies(options: ExecutorOptions) { agentPort, schemaCache, workflowPort, - aiClient, + aiModelPort, logger, pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS, envSecret: options.envSecret, diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index b9b32f8b80..ed0cf1249f 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -1,4 +1,5 @@ import type { AgentPort } from '../ports/agent-port'; +import type { AiModelPort } from '../ports/ai-model-port'; import type { Logger } from '../ports/logger-port'; import type { RunStore } from '../ports/run-store'; import type { WorkflowPort } from '../ports/workflow-port'; @@ -17,7 +18,7 @@ import type { TriggerActionStepDefinition, UpdateRecordStepDefinition, } from '../types/step-definition'; -import type { AiClient, RemoteTool } from '@forestadmin/ai-proxy'; +import type { RemoteTool } from '@forestadmin/ai-proxy'; import { StepStateError, causeMessage } from '../errors'; import ConditionStepExecutor from './condition-step-executor'; @@ -30,7 +31,7 @@ import { StepType } from '../types/step-definition'; import { stepTypeToOutcomeType } from '../types/step-outcome'; export interface StepContextConfig { - aiClient: AiClient; + aiModelPort: AiModelPort; agentPort: AgentPort; workflowPort: WorkflowPort; runStore: RunStore; @@ -104,7 +105,7 @@ export default class StepExecutorFactory { ): ExecutionContext { return { ...step, - model: cfg.aiClient.getModel(step.stepDefinition.aiConfigName), + model: cfg.aiModelPort.getModel(step), agentPort: cfg.agentPort, workflowPort: cfg.workflowPort, runStore: cfg.runStore, diff --git a/packages/workflow-executor/src/ports/ai-model-port.ts b/packages/workflow-executor/src/ports/ai-model-port.ts new file mode 100644 index 0000000000..76b0c2f9d1 --- /dev/null +++ b/packages/workflow-executor/src/ports/ai-model-port.ts @@ -0,0 +1,9 @@ +import type { PendingStepExecution } from '../types/execution'; +import type { McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; + +export interface AiModelPort { + getModel(step: PendingStepExecution): BaseChatModel; + loadRemoteTools(config: McpConfiguration): Promise; + closeConnections(): Promise; +} diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 470c55a3a5..b661392716 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -1,12 +1,13 @@ import type { StepContextConfig } from './executors/step-executor-factory'; import type { AgentPort } from './ports/agent-port'; +import type { AiModelPort } from './ports/ai-model-port'; import type { Logger } from './ports/logger-port'; import type { RunStore } from './ports/run-store'; import type { McpConfiguration, WorkflowPort } from './ports/workflow-port'; import type SchemaCache from './schema-cache'; import type { PendingStepExecution, StepExecutionResult } from './types/execution'; import type { StepExecutionData } from './types/step-execution-data'; -import type { AiClient, RemoteTool } from '@forestadmin/ai-proxy'; +import type { RemoteTool } from '@forestadmin/ai-proxy'; import ConsoleLogger from './adapters/console-logger'; import { @@ -28,7 +29,7 @@ export interface RunnerConfig { runStore: RunStore; schemaCache: SchemaCache; pollingIntervalMs: number; - aiClient: AiClient; + aiModelPort: AiModelPort; envSecret: string; authSecret: string; logger?: Logger; @@ -125,7 +126,7 @@ export default class Runner { // Close resources — log failures instead of silently swallowing const results = await Promise.allSettled([ - this.config.aiClient.closeConnections(), + this.config.aiModelPort.closeConnections(), this.config.runStore.close(this.logger), ]); @@ -222,7 +223,7 @@ export default class Runner { configs: Object.assign({}, ...configs.map(c => c.configs)), }; - return this.config.aiClient.loadRemoteTools(mergedConfig); + return this.config.aiModelPort.loadRemoteTools(mergedConfig); } private executeStep(step: PendingStepExecution): Promise { @@ -269,7 +270,7 @@ export default class Runner { private get contextConfig(): StepContextConfig { return { - aiClient: this.config.aiClient, + aiModelPort: this.config.aiModelPort, agentPort: this.config.agentPort, workflowPort: this.config.workflowPort, runStore: this.config.runStore, diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index a3cc2d775e..7251928081 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -28,6 +28,7 @@ export interface Step { } export interface PendingStepExecution { + readonly envId: string; readonly runId: string; readonly stepId: string; readonly stepIndex: number; diff --git a/packages/workflow-executor/test/adapters/ai-client-adapter.test.ts b/packages/workflow-executor/test/adapters/ai-client-adapter.test.ts new file mode 100644 index 0000000000..5c0381ec9b --- /dev/null +++ b/packages/workflow-executor/test/adapters/ai-client-adapter.test.ts @@ -0,0 +1,81 @@ +import type { PendingStepExecution } from '../../src/types/execution'; + +import AiClientAdapter from '../../src/adapters/ai-client-adapter'; +import { StepType } from '../../src/types/step-definition'; + +const mockGetModel = jest.fn().mockReturnValue({ invoke: jest.fn() }); +const mockLoadRemoteTools = jest.fn().mockResolvedValue([]); +const mockCloseConnections = jest.fn().mockResolvedValue(undefined); + +jest.mock('@forestadmin/ai-proxy', () => ({ + AiClient: jest.fn().mockImplementation(() => ({ + getModel: mockGetModel, + loadRemoteTools: mockLoadRemoteTools, + closeConnections: mockCloseConnections, + })), +})); + +function makeStep(overrides: Partial = {}): PendingStepExecution { + return { + envId: 'env-1', + runId: 'run-1', + stepId: 'step-1', + stepIndex: 0, + baseRecordRef: { collectionName: 'customers', recordId: ['1'], stepIndex: 0 }, + stepDefinition: { type: StepType.ReadRecord, aiConfigName: 'my-config' }, + previousSteps: [], + user: { + id: 1, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + }, + ...overrides, + }; +} + +describe('AiClientAdapter', () => { + beforeEach(() => jest.clearAllMocks()); + + it('delegates getModel to AiClient with aiConfigName from step definition', () => { + const adapter = new AiClientAdapter([ + { name: 'my-config', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' }, + ]); + + adapter.getModel(makeStep()); + + expect(mockGetModel).toHaveBeenCalledWith('my-config'); + }); + + it('delegates getModel without aiConfigName when not set', () => { + const adapter = new AiClientAdapter([ + { name: 'default', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' }, + ]); + + adapter.getModel(makeStep({ stepDefinition: { type: StepType.ReadRecord } })); + + expect(mockGetModel).toHaveBeenCalledWith(undefined); + }); + + it('delegates loadRemoteTools to AiClient', async () => { + const adapter = new AiClientAdapter([]); + const config = { configs: {} }; + + await adapter.loadRemoteTools(config); + + expect(mockLoadRemoteTools).toHaveBeenCalledWith(config); + }); + + it('delegates closeConnections to AiClient', async () => { + const adapter = new AiClientAdapter([]); + + await adapter.closeConnections(); + + expect(mockCloseConnections).toHaveBeenCalled(); + }); +}); diff --git a/packages/workflow-executor/test/adapters/server-ai-adapter.test.ts b/packages/workflow-executor/test/adapters/server-ai-adapter.test.ts new file mode 100644 index 0000000000..e9f544d881 --- /dev/null +++ b/packages/workflow-executor/test/adapters/server-ai-adapter.test.ts @@ -0,0 +1,139 @@ +import type { PendingStepExecution } from '../../src/types/execution'; + +import jsonwebtoken from 'jsonwebtoken'; + +import ServerAiAdapter from '../../src/adapters/server-ai-adapter'; +import { StepType } from '../../src/types/step-definition'; + +jest.mock('@forestadmin/ai-proxy', () => ({ + AiClient: jest.fn().mockImplementation(() => ({ + loadRemoteTools: jest.fn().mockResolvedValue([]), + closeConnections: jest.fn().mockResolvedValue(undefined), + })), +})); + +jest.mock('@langchain/openai', () => ({ + ChatOpenAI: jest.fn().mockImplementation((opts: Record) => ({ + mockOpts: opts, + })), +})); + +const ENV_SECRET = 'a'.repeat(64); + +function makeStep(overrides: Partial = {}): PendingStepExecution { + return { + envId: 'env-42', + runId: 'run-1', + stepId: 'step-1', + stepIndex: 0, + baseRecordRef: { collectionName: 'customers', recordId: ['1'], stepIndex: 0 }, + stepDefinition: { type: StepType.ReadRecord }, + previousSteps: [], + user: { + id: 7, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + }, + ...overrides, + }; +} + +describe('ServerAiAdapter', () => { + const adapter = new ServerAiAdapter({ + forestServerUrl: 'https://api.forestadmin.com', + envSecret: ENV_SECRET, + }); + + describe('getModel', () => { + it('returns a ChatOpenAI with baseURL pointing to the FA server', () => { + const model = adapter.getModel(makeStep()) as unknown as { + mockOpts: Record; + }; + + expect(model.mockOpts).toEqual( + expect.objectContaining({ + model: 'gpt-4.1', + maxRetries: 2, + configuration: expect.objectContaining({ + fetch: expect.any(Function), + }), + }), + ); + }); + + it('forges a JWT signed with envSecret containing step context', () => { + const step = makeStep({ envId: 'env-99', runId: 'run-abc', stepIndex: 3 }); + const model = adapter.getModel(step) as unknown as { mockOpts: Record }; + + const { apiKey } = model.mockOpts.configuration as { apiKey: string }; + const decoded = jsonwebtoken.verify(apiKey, ENV_SECRET) as Record; + + expect(decoded).toEqual( + expect.objectContaining({ + envId: 'env-99', + userId: 7, + runId: 'run-abc', + stepIndex: 3, + }), + ); + }); + + it('provides a fetch function that redirects to /liana/v1/ai-proxy', () => { + const model = adapter.getModel(makeStep()) as unknown as { + mockOpts: Record; + }; + + const { fetch: customFetch } = model.mockOpts.configuration as { + fetch: (url: RequestInfo | URL, init?: RequestInit) => Promise; + }; + + const mockInit = { method: 'POST', body: '{}' } as RequestInit; + const mockResponse = new Response('{}', { status: 200 }); + const originalFetch = global.fetch; + global.fetch = jest.fn().mockResolvedValue(mockResponse); + + customFetch('https://ignored.com/chat/completions', mockInit); + + expect(global.fetch).toHaveBeenCalledWith( + 'https://api.forestadmin.com/liana/v1/ai-proxy', + mockInit, + ); + + global.fetch = originalFetch; + }); + + it('forges a new JWT per call (different tokens for different steps)', () => { + const model1 = adapter.getModel(makeStep({ stepIndex: 0 })) as unknown as { + mockOpts: Record; + }; + const model2 = adapter.getModel(makeStep({ stepIndex: 1 })) as unknown as { + mockOpts: Record; + }; + + const token1 = (model1.mockOpts.configuration as { apiKey: string }).apiKey; + const token2 = (model2.mockOpts.configuration as { apiKey: string }).apiKey; + + expect(token1).not.toBe(token2); + }); + }); + + describe('loadRemoteTools', () => { + it('delegates to internal AiClient', async () => { + const result = await adapter.loadRemoteTools({ configs: {} }); + + expect(result).toEqual([]); + }); + }); + + describe('closeConnections', () => { + it('resolves without error', async () => { + await expect(adapter.closeConnections()).resolves.toBeUndefined(); + }); + }); +}); diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index e89f9b0ef0..41f2ca04ed 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -11,9 +11,9 @@ jest.mock('../src/stores/database-store'); jest.mock('../src/adapters/agent-client-agent-port'); jest.mock('../src/adapters/forest-server-workflow-port'); jest.mock('../src/http/executor-http-server'); -jest.mock('@forestadmin/ai-proxy', () => ({ - AiClient: jest.fn(), -})); +jest.mock('../src/adapters/ai-client-adapter'); +jest.mock('@langchain/openai', () => ({ ChatOpenAI: jest.fn() })); +jest.mock('../src/adapters/server-ai-adapter'); jest.mock('sequelize', () => ({ Sequelize: jest.fn(), })); @@ -91,14 +91,25 @@ describe('buildInMemoryExecutor', () => { ); }); - it('creates AiClient with the provided aiConfigurations', () => { + it('creates AiClientAdapter when aiConfigurations are provided', () => { // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require - const { AiClient } = require('@forestadmin/ai-proxy'); + const AiClientAdapter = require('../src/adapters/ai-client-adapter').default; buildInMemoryExecutor(BASE_OPTIONS); - expect(AiClient).toHaveBeenCalledWith({ - aiConfigurations: BASE_OPTIONS.aiConfigurations, + expect(AiClientAdapter).toHaveBeenCalledWith(BASE_OPTIONS.aiConfigurations); + }); + + it('creates ServerAiAdapter when aiConfigurations is not provided', () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require + const ServerAiAdapter = require('../src/adapters/server-ai-adapter').default; + + const { aiConfigurations, ...optionsWithoutAi } = BASE_OPTIONS; + buildInMemoryExecutor(optionsWithoutAi); + + expect(ServerAiAdapter).toHaveBeenCalledWith({ + forestServerUrl: 'https://api.forestadmin.com', + envSecret: BASE_OPTIONS.envSecret, }); }); diff --git a/packages/workflow-executor/test/index.test.ts b/packages/workflow-executor/test/index.test.ts index 0274cb9734..585d0c0b72 100644 --- a/packages/workflow-executor/test/index.test.ts +++ b/packages/workflow-executor/test/index.test.ts @@ -1,5 +1,7 @@ import { StepType } from '../src/index'; +jest.mock('@langchain/openai', () => ({ ChatOpenAI: jest.fn() })); + describe('StepType', () => { it('should expose exactly 6 step types', () => { const values = Object.values(StepType); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 943beef79f..3dd7d21150 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -1,8 +1,9 @@ import type { AgentPort } from '../../src/ports/agent-port'; +import type { AiModelPort } from '../../src/ports/ai-model-port'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { PendingStepExecution, StepUser } from '../../src/types/execution'; import type { CollectionSchema } from '../../src/types/record'; -import type { AiClient, BaseChatModel, RemoteTool } from '@forestadmin/ai-proxy'; +import type { BaseChatModel, RemoteTool } from '@forestadmin/ai-proxy'; import jsonwebtoken from 'jsonwebtoken'; import request from 'supertest'; @@ -124,12 +125,12 @@ function createSequentialMockModel( return { invoke, bindTools: jest.fn().mockReturnThis() } as unknown as BaseChatModel; } -function createMockAiClient(model: BaseChatModel): AiClient { +function createMockAiClient(model: BaseChatModel): AiModelPort { return { getModel: jest.fn().mockReturnValue(model), loadRemoteTools: jest.fn().mockResolvedValue([]), closeConnections: jest.fn().mockResolvedValue(undefined), - } as unknown as AiClient; + } as unknown as AiModelPort; } function createMockWorkflowPort(overrides: Partial = {}): jest.Mocked { @@ -169,7 +170,7 @@ function createIntegrationSetup(overrides?: { workflowPort?: jest.Mocked; model?: BaseChatModel; agentPort?: jest.Mocked; - aiClient?: AiClient; + aiClient?: AiModelPort; pollingIntervalMs?: number; }) { const model = overrides?.model ?? createMockModel({ fieldNames: ['Email'] }); @@ -184,7 +185,7 @@ function createIntegrationSetup(overrides?: { workflowPort, runStore, schemaCache, - aiClient, + aiModelPort: aiClient, pollingIntervalMs: overrides?.pollingIntervalMs ?? 60_000, envSecret: ENV_SECRET, authSecret: AUTH_SECRET, @@ -204,6 +205,7 @@ function buildPendingStep( overrides: Partial & Pick, ): PendingStepExecution { return { + envId: 'env-1', runId: 'run-1', stepId: 'step-1', stepIndex: 0, @@ -222,6 +224,7 @@ describe('workflow execution (integration)', () => { it('read-record happy path: trigger → AI selects field → read record → success', async () => { const workflowPort = createMockWorkflowPort({ getPendingStepExecutionsForRun: jest.fn().mockResolvedValue({ + envId: 'env-1', runId: 'run-1', stepId: 'step-1', stepIndex: 0, diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 94b77796b0..12d6008a4f 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1,11 +1,12 @@ import type { StepContextConfig } from '../src/executors/step-executor-factory'; import type { AgentPort } from '../src/ports/agent-port'; +import type { AiModelPort } from '../src/ports/ai-model-port'; import type { Logger } from '../src/ports/logger-port'; import type { RunStore } from '../src/ports/run-store'; import type { WorkflowPort } from '../src/ports/workflow-port'; import type { PendingStepExecution } from '../src/types/execution'; import type { StepDefinition } from '../src/types/step-definition'; -import type { AiClient, BaseChatModel } from '@forestadmin/ai-proxy'; +import type { BaseChatModel } from '@forestadmin/ai-proxy'; import { ConfigurationError, @@ -78,7 +79,7 @@ function createRunnerConfig( overrides: Partial<{ workflowPort: WorkflowPort; runStore: RunStore; - aiClient: AiClient; + aiModelPort: AiModelPort; logger: Logger; envSecret: string; authSecret: string; @@ -96,7 +97,7 @@ function createRunnerConfig( saveStepExecution: jest.fn().mockResolvedValue(undefined), } as unknown as RunStore, pollingIntervalMs: POLLING_INTERVAL_MS, - aiClient: createMockAiClient() as unknown as AiClient, + aiModelPort: createMockAiClient() as unknown as AiModelPort, logger: createMockLogger(), schemaCache: new SchemaCache(), envSecret: VALID_ENV_SECRET, @@ -123,6 +124,7 @@ function makePendingStep( const { stepType = StepType.ReadRecord, ...rest } = overrides; return { + envId: 'env-1', runId: 'run-1', stepId: 'step-1', stepIndex: 0, @@ -280,7 +282,9 @@ describe('graceful shutdown', () => { const aiClient = createMockAiClient(); aiClient.closeConnections.mockRejectedValueOnce(new Error('connection leak')); - runner = new Runner(createRunnerConfig({ logger, aiClient: aiClient as unknown as AiClient })); + runner = new Runner( + createRunnerConfig({ logger, aiModelPort: aiClient as unknown as AiModelPort }), + ); await runner.start(); await runner.stop(); @@ -554,7 +558,7 @@ describe('deduplication', () => { }); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await runner.triggerPoll('run-1'); @@ -665,7 +669,7 @@ describe('MCP lazy loading (via once thunk)', () => { workflowPort.getPendingStepExecutionsForRun.mockResolvedValue(step); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await runner.triggerPoll('run-1'); @@ -686,7 +690,7 @@ describe('MCP lazy loading (via once thunk)', () => { workflowPort.getMcpServerConfigs.mockResolvedValue([{ configs: {} }] as never); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await runner.triggerPoll('run-1'); @@ -701,9 +705,9 @@ describe('MCP lazy loading (via once thunk)', () => { describe('StepExecutorFactory.create — factory', () => { const makeContextConfig = (): StepContextConfig => ({ - aiClient: { + aiModelPort: { getModel: jest.fn().mockReturnValue({} as BaseChatModel), - } as unknown as AiClient, + } as unknown as AiModelPort, agentPort: {} as AgentPort, workflowPort: {} as WorkflowPort, runStore: {} as RunStore, @@ -777,11 +781,11 @@ describe('StepExecutorFactory.create — factory', () => { const logger = { info: jest.fn(), error: jest.fn() }; const contextConfig: StepContextConfig = { ...makeContextConfig(), - aiClient: { + aiModelPort: { getModel: jest.fn().mockImplementationOnce(() => { throw error; }), - } as unknown as AiClient, + } as unknown as AiModelPort, logger, }; @@ -799,11 +803,11 @@ describe('StepExecutorFactory.create — factory', () => { const logger = { info: jest.fn(), error: jest.fn() }; const contextConfig: StepContextConfig = { ...makeContextConfig(), - aiClient: { + aiModelPort: { getModel: jest.fn().mockImplementationOnce(() => { throw error; }), - } as unknown as AiClient, + } as unknown as AiModelPort, logger, }; @@ -834,7 +838,7 @@ describe('error handling', () => { runner = new Runner( createRunnerConfig({ workflowPort, - aiClient: aiClient as unknown as AiClient, + aiModelPort: aiClient as unknown as AiModelPort, logger: mockLogger, }), ); @@ -872,7 +876,7 @@ describe('error handling', () => { }); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await runner.triggerPoll('run-1'); @@ -896,7 +900,7 @@ describe('error handling', () => { runner = new Runner( createRunnerConfig({ workflowPort, - aiClient: aiClient as unknown as AiClient, + aiModelPort: aiClient as unknown as AiModelPort, logger: mockLogger, }), ); @@ -925,7 +929,7 @@ describe('error handling', () => { workflowPort.updateStepExecution.mockRejectedValueOnce(new Error('update failed')); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await expect(runner.triggerPoll('run-1')).resolves.toBeUndefined(); @@ -967,7 +971,7 @@ describe('error handling', () => { }); runner = new Runner( - createRunnerConfig({ workflowPort, aiClient: aiClient as unknown as AiClient }), + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), ); await runner.triggerPoll('run-1');