Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/workflow-executor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"dependencies": {
"@forestadmin/agent-client": "1.4.18",
"@forestadmin/ai-proxy": "1.7.2",
"@langchain/openai": "1.2.5",
"@forestadmin/forestadmin-client": "1.38.2",
"@koa/bodyparser": "^6.1.0",
"@koa/router": "^13.1.0",
Expand Down
27 changes: 27 additions & 0 deletions packages/workflow-executor/src/adapters/ai-client-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { AiModelPort } from '../ports/ai-model-port';
import type { PendingStepExecution } from '../types/execution';
import type { AiConfiguration, McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { AiClient } from '@forestadmin/ai-proxy';

export default class AiClientAdapter implements AiModelPort {
private readonly aiClient: AiClient;

constructor(aiConfigurations: AiConfiguration[]) {
const withRetries = aiConfigurations.map(c => ({ maxRetries: 2, ...c }));
this.aiClient = new AiClient({ aiConfigurations: withRetries as AiConfiguration[] });
}

getModel(step: PendingStepExecution): BaseChatModel {
return this.aiClient.getModel(step.stepDefinition.aiConfigName);
}

loadRemoteTools(config: McpConfiguration): Promise<RemoteTool[]> {
return this.aiClient.loadRemoteTools(config);
}

closeConnections(): Promise<void> {
return this.aiClient.closeConnections();
}
}
54 changes: 54 additions & 0 deletions packages/workflow-executor/src/adapters/server-ai-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { AiModelPort } from '../ports/ai-model-port';
import type { PendingStepExecution } from '../types/execution';
import type { McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { AiClient } from '@forestadmin/ai-proxy';
import { ChatOpenAI } from '@langchain/openai';
import jsonwebtoken from 'jsonwebtoken';

export interface ServerAiAdapterOptions {
forestServerUrl: string;
envSecret: string;
}

export default class ServerAiAdapter implements AiModelPort {
private readonly forestServerUrl: string;
private readonly envSecret: string;
private readonly aiClient: AiClient;

constructor(options: ServerAiAdapterOptions) {
this.forestServerUrl = options.forestServerUrl;
this.envSecret = options.envSecret;
this.aiClient = new AiClient();
}

getModel(step: PendingStepExecution): BaseChatModel {
const jwt = jsonwebtoken.sign(
{ envId: step.envId, userId: step.user.id, runId: step.runId, stepIndex: step.stepIndex },
this.envSecret,
{ expiresIn: '1h' },
);

const aiProxyUrl = `${this.forestServerUrl}/liana/v1/ai-proxy`;

return new ChatOpenAI({
// Model has no effect — the server uses its own configured model.
// Set here only because ChatOpenAI requires it.
model: 'gpt-4.1',
maxRetries: 2,
configuration: {
apiKey: jwt,
fetch: (_url: RequestInfo | URL, init?: RequestInit) => fetch(aiProxyUrl, init),
},
});
}

loadRemoteTools(config: McpConfiguration): Promise<RemoteTool[]> {
return this.aiClient.loadRemoteTools(config);
}

closeConnections(): Promise<void> {
return this.aiClient.closeConnections();
}
}
13 changes: 7 additions & 6 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import type { RunnerState } from './runner';
import type { AiConfiguration } from '@forestadmin/ai-proxy';
import type { Options as SequelizeOptions } from 'sequelize';

import { AiClient } from '@forestadmin/ai-proxy';
import { Sequelize } from 'sequelize';

import AgentClientAgentPort from './adapters/agent-client-agent-port';
import AiClientAdapter from './adapters/ai-client-adapter';
import ConsoleLogger from './adapters/console-logger';
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ServerAiAdapter from './adapters/server-ai-adapter';
import ExecutorHttpServer from './http/executor-http-server';
import Runner from './runner';
import SchemaCache from './schema-cache';
Expand All @@ -31,7 +32,7 @@ export interface ExecutorOptions {
agentUrl: string;
httpPort: number;
forestServerUrl?: string;
aiConfigurations: AiConfiguration[];
aiConfigurations?: AiConfiguration[];
pollingIntervalMs?: number;
logger?: Logger;
stopTimeoutMs?: number;
Expand All @@ -49,9 +50,9 @@ function buildCommonDependencies(options: ExecutorOptions) {
forestServerUrl,
});

const aiClient = new AiClient({
aiConfigurations: options.aiConfigurations,
});
const aiModelPort = options.aiConfigurations?.length
? new AiClientAdapter(options.aiConfigurations)
: new ServerAiAdapter({ forestServerUrl, envSecret: options.envSecret });

const schemaCache = new SchemaCache();

Expand All @@ -65,7 +66,7 @@ function buildCommonDependencies(options: ExecutorOptions) {
agentPort,
schemaCache,
workflowPort,
aiClient,
aiModelPort,
logger,
pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
envSecret: options.envSecret,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AgentPort } from '../ports/agent-port';
import type { AiModelPort } from '../ports/ai-model-port';
import type { Logger } from '../ports/logger-port';
import type { RunStore } from '../ports/run-store';
import type { WorkflowPort } from '../ports/workflow-port';
Expand All @@ -17,7 +18,7 @@ import type {
TriggerActionStepDefinition,
UpdateRecordStepDefinition,
} from '../types/step-definition';
import type { AiClient, RemoteTool } from '@forestadmin/ai-proxy';
import type { RemoteTool } from '@forestadmin/ai-proxy';

import { StepStateError, causeMessage } from '../errors';
import ConditionStepExecutor from './condition-step-executor';
Expand All @@ -30,7 +31,7 @@ import { StepType } from '../types/step-definition';
import { stepTypeToOutcomeType } from '../types/step-outcome';

export interface StepContextConfig {
aiClient: AiClient;
aiModelPort: AiModelPort;
agentPort: AgentPort;
workflowPort: WorkflowPort;
runStore: RunStore;
Expand Down Expand Up @@ -104,7 +105,7 @@ export default class StepExecutorFactory {
): ExecutionContext {
return {
...step,
model: cfg.aiClient.getModel(step.stepDefinition.aiConfigName),
model: cfg.aiModelPort.getModel(step),
agentPort: cfg.agentPort,
workflowPort: cfg.workflowPort,
runStore: cfg.runStore,
Expand Down
9 changes: 9 additions & 0 deletions packages/workflow-executor/src/ports/ai-model-port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { PendingStepExecution } from '../types/execution';
import type { McpConfiguration, RemoteTool } from '@forestadmin/ai-proxy';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

export interface AiModelPort {
getModel(step: PendingStepExecution): BaseChatModel;
loadRemoteTools(config: McpConfiguration): Promise<RemoteTool[]>;
closeConnections(): Promise<void>;
}
11 changes: 6 additions & 5 deletions packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { StepContextConfig } from './executors/step-executor-factory';
import type { AgentPort } from './ports/agent-port';
import type { AiModelPort } from './ports/ai-model-port';
import type { Logger } from './ports/logger-port';
import type { RunStore } from './ports/run-store';
import type { McpConfiguration, WorkflowPort } from './ports/workflow-port';
import type SchemaCache from './schema-cache';
import type { PendingStepExecution, StepExecutionResult } from './types/execution';
import type { StepExecutionData } from './types/step-execution-data';
import type { AiClient, RemoteTool } from '@forestadmin/ai-proxy';
import type { RemoteTool } from '@forestadmin/ai-proxy';

import ConsoleLogger from './adapters/console-logger';
import {
Expand All @@ -28,7 +29,7 @@ export interface RunnerConfig {
runStore: RunStore;
schemaCache: SchemaCache;
pollingIntervalMs: number;
aiClient: AiClient;
aiModelPort: AiModelPort;
envSecret: string;
authSecret: string;
logger?: Logger;
Expand Down Expand Up @@ -125,7 +126,7 @@ export default class Runner {

// Close resources — log failures instead of silently swallowing
const results = await Promise.allSettled([
this.config.aiClient.closeConnections(),
this.config.aiModelPort.closeConnections(),
this.config.runStore.close(this.logger),
]);

Expand Down Expand Up @@ -222,7 +223,7 @@ export default class Runner {
configs: Object.assign({}, ...configs.map(c => c.configs)),
};

return this.config.aiClient.loadRemoteTools(mergedConfig);
return this.config.aiModelPort.loadRemoteTools(mergedConfig);
}

private executeStep(step: PendingStepExecution): Promise<void> {
Expand Down Expand Up @@ -269,7 +270,7 @@ export default class Runner {

private get contextConfig(): StepContextConfig {
return {
aiClient: this.config.aiClient,
aiModelPort: this.config.aiModelPort,
agentPort: this.config.agentPort,
workflowPort: this.config.workflowPort,
runStore: this.config.runStore,
Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/src/types/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface Step {
}

export interface PendingStepExecution {
readonly envId: string;
readonly runId: string;
readonly stepId: string;
readonly stepIndex: number;
Expand Down
81 changes: 81 additions & 0 deletions packages/workflow-executor/test/adapters/ai-client-adapter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { PendingStepExecution } from '../../src/types/execution';

import AiClientAdapter from '../../src/adapters/ai-client-adapter';
import { StepType } from '../../src/types/step-definition';

const mockGetModel = jest.fn().mockReturnValue({ invoke: jest.fn() });
const mockLoadRemoteTools = jest.fn().mockResolvedValue([]);
const mockCloseConnections = jest.fn().mockResolvedValue(undefined);

jest.mock('@forestadmin/ai-proxy', () => ({
AiClient: jest.fn().mockImplementation(() => ({
getModel: mockGetModel,
loadRemoteTools: mockLoadRemoteTools,
closeConnections: mockCloseConnections,
})),
}));

function makeStep(overrides: Partial<PendingStepExecution> = {}): PendingStepExecution {
return {
envId: 'env-1',
runId: 'run-1',
stepId: 'step-1',
stepIndex: 0,
baseRecordRef: { collectionName: 'customers', recordId: ['1'], stepIndex: 0 },
stepDefinition: { type: StepType.ReadRecord, aiConfigName: 'my-config' },
previousSteps: [],
user: {
id: 1,
email: 'test@example.com',
firstName: 'Test',
lastName: 'User',
team: 'admin',
renderingId: 1,
role: 'admin',
permissionLevel: 'admin',
tags: {},
},
...overrides,
};
}

describe('AiClientAdapter', () => {
beforeEach(() => jest.clearAllMocks());

it('delegates getModel to AiClient with aiConfigName from step definition', () => {
const adapter = new AiClientAdapter([
{ name: 'my-config', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' },
]);

adapter.getModel(makeStep());

expect(mockGetModel).toHaveBeenCalledWith('my-config');
});

it('delegates getModel without aiConfigName when not set', () => {
const adapter = new AiClientAdapter([
{ name: 'default', provider: 'openai' as const, model: 'gpt-4o', apiKey: 'sk-test' },
]);

adapter.getModel(makeStep({ stepDefinition: { type: StepType.ReadRecord } }));

expect(mockGetModel).toHaveBeenCalledWith(undefined);
});

it('delegates loadRemoteTools to AiClient', async () => {
const adapter = new AiClientAdapter([]);
const config = { configs: {} };

await adapter.loadRemoteTools(config);

expect(mockLoadRemoteTools).toHaveBeenCalledWith(config);
});

it('delegates closeConnections to AiClient', async () => {
const adapter = new AiClientAdapter([]);

await adapter.closeConnections();

expect(mockCloseConnections).toHaveBeenCalled();
});
});
Loading
Loading