From cf25fb084389e082b2e3ff033e77b34f9d95c732 Mon Sep 17 00:00:00 2001 From: waleed Date: Sun, 8 Feb 2026 15:11:45 -0800 Subject: [PATCH 1/7] fix(mcp): harden notification system against race conditions - Guard concurrent connect() calls in connection manager with connectingServers Set - Suppress post-disconnect notification handler firing in MCP client - Clean up Redis event listeners in pub/sub dispose() - Add tests for all three hardening fixes (11 new tests) --- apps/sim/lib/mcp/client.test.ts | 111 ++++++ apps/sim/lib/mcp/client.ts | 81 ++++- apps/sim/lib/mcp/connection-manager.test.ts | 184 ++++++++++ apps/sim/lib/mcp/connection-manager.ts | 361 ++++++++++++++++++++ apps/sim/lib/mcp/pubsub.test.ts | 93 +++++ apps/sim/lib/mcp/pubsub.ts | 209 ++++++++++++ apps/sim/lib/mcp/types.ts | 38 +++ 7 files changed, 1059 insertions(+), 18 deletions(-) create mode 100644 apps/sim/lib/mcp/client.test.ts create mode 100644 apps/sim/lib/mcp/connection-manager.test.ts create mode 100644 apps/sim/lib/mcp/connection-manager.ts create mode 100644 apps/sim/lib/mcp/pubsub.test.ts create mode 100644 apps/sim/lib/mcp/pubsub.ts diff --git a/apps/sim/lib/mcp/client.test.ts b/apps/sim/lib/mcp/client.test.ts new file mode 100644 index 0000000000..080ca391cf --- /dev/null +++ b/apps/sim/lib/mcp/client.test.ts @@ -0,0 +1,111 @@ +/** + * @vitest-environment node + */ +import { loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) + +/** + * Capture the notification handler registered via `client.setNotificationHandler()`. + * This lets us simulate the MCP SDK delivering a `tools/list_changed` notification. + */ +let capturedNotificationHandler: (() => Promise) | null = null + +vi.mock('@modelcontextprotocol/sdk/client/index.js', () => ({ + Client: vi.fn().mockImplementation(() => ({ + connect: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + getServerVersion: vi.fn().mockReturnValue('2025-06-18'), + getServerCapabilities: vi.fn().mockReturnValue({ tools: { listChanged: true } }), + setNotificationHandler: vi + .fn() + .mockImplementation((_schema: unknown, handler: () => Promise) => { + capturedNotificationHandler = handler + }), + listTools: vi.fn().mockResolvedValue({ tools: [] }), + })), +})) + +vi.mock('@modelcontextprotocol/sdk/client/streamableHttp.js', () => ({ + StreamableHTTPClientTransport: vi.fn().mockImplementation(() => ({ + onclose: null, + sessionId: 'test-session', + })), +})) + +vi.mock('@modelcontextprotocol/sdk/types.js', () => ({ + ToolListChangedNotificationSchema: { method: 'notifications/tools/list_changed' }, +})) + +vi.mock('@/lib/core/execution-limits', () => ({ + getMaxExecutionTimeout: vi.fn().mockReturnValue(30000), +})) + +import { McpClient } from './client' +import type { McpServerConfig } from './types' + +function createConfig(): McpServerConfig { + return { + id: 'server-1', + name: 'Test Server', + transport: 'streamable-http', + url: 'https://test.example.com/mcp', + } +} + +describe('McpClient notification handler', () => { + beforeEach(() => { + capturedNotificationHandler = null + }) + + it('fires onToolsChanged when a notification arrives while connected', async () => { + const onToolsChanged = vi.fn() + + const client = new McpClient({ + config: createConfig(), + securityPolicy: { requireConsent: false, auditLevel: 'basic' }, + onToolsChanged, + }) + + await client.connect() + + expect(capturedNotificationHandler).not.toBeNull() + + await capturedNotificationHandler!() + + expect(onToolsChanged).toHaveBeenCalledTimes(1) + expect(onToolsChanged).toHaveBeenCalledWith('server-1') + }) + + it('suppresses notifications after disconnect', async () => { + const onToolsChanged = vi.fn() + + const client = new McpClient({ + config: createConfig(), + securityPolicy: { requireConsent: false, auditLevel: 'basic' }, + onToolsChanged, + }) + + await client.connect() + expect(capturedNotificationHandler).not.toBeNull() + + await client.disconnect() + + // Simulate a late notification arriving after disconnect + await capturedNotificationHandler!() + + expect(onToolsChanged).not.toHaveBeenCalled() + }) + + it('does not register a notification handler when onToolsChanged is not provided', async () => { + const client = new McpClient({ + config: createConfig(), + securityPolicy: { requireConsent: false, auditLevel: 'basic' }, + }) + + await client.connect() + + expect(capturedNotificationHandler).toBeNull() + }) +}) diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 56375613f5..02395ed8fc 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -10,10 +10,15 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' -import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js' +import { + type ListToolsResult, + type Tool, + ToolListChangedNotificationSchema, +} from '@modelcontextprotocol/sdk/types.js' import { createLogger } from '@sim/logger' import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { + type McpClientOptions, McpConnectionError, type McpConnectionStatus, type McpConsentRequest, @@ -24,6 +29,7 @@ import { type McpTool, type McpToolCall, type McpToolResult, + type McpToolsChangedCallback, type McpVersionInfo, } from '@/lib/mcp/types' @@ -35,6 +41,7 @@ export class McpClient { private config: McpServerConfig private connectionStatus: McpConnectionStatus private securityPolicy: McpSecurityPolicy + private onToolsChanged?: McpToolsChangedCallback private isConnected = false private static readonly SUPPORTED_VERSIONS = [ @@ -44,23 +51,36 @@ export class McpClient { ] /** - * Creates a new MCP client - * - * No session ID parameter (we disconnect after each operation). - * The SDK handles session management automatically via Mcp-Session-Id header. + * Creates a new MCP client. * - * @param config - Server configuration - * @param securityPolicy - Optional security policy + * Accepts either the legacy (config, securityPolicy?) signature + * or a single McpClientOptions object with an optional onToolsChanged callback. */ - constructor(config: McpServerConfig, securityPolicy?: McpSecurityPolicy) { - this.config = config - this.connectionStatus = { connected: false } - this.securityPolicy = securityPolicy ?? { - requireConsent: true, - auditLevel: 'basic', - maxToolExecutionsPerHour: 1000, + constructor(config: McpServerConfig, securityPolicy?: McpSecurityPolicy) + constructor(options: McpClientOptions) + constructor( + configOrOptions: McpServerConfig | McpClientOptions, + securityPolicy?: McpSecurityPolicy + ) { + if ('config' in configOrOptions) { + this.config = configOrOptions.config + this.securityPolicy = configOrOptions.securityPolicy ?? { + requireConsent: true, + auditLevel: 'basic', + maxToolExecutionsPerHour: 1000, + } + this.onToolsChanged = configOrOptions.onToolsChanged + } else { + this.config = configOrOptions + this.securityPolicy = securityPolicy ?? { + requireConsent: true, + auditLevel: 'basic', + maxToolExecutionsPerHour: 1000, + } } + this.connectionStatus = { connected: false } + if (!this.config.url) { throw new McpError('URL required for Streamable HTTP transport') } @@ -79,16 +99,15 @@ export class McpClient { { capabilities: { tools: {}, - // Resources and prompts can be added later - // resources: {}, - // prompts: {}, }, } ) } /** - * Initialize connection to MCP server + * Initialize connection to MCP server. + * If an `onToolsChanged` callback was provided, registers a notification handler + * for `notifications/tools/list_changed` after connecting. */ async connect(): Promise { logger.info(`Connecting to MCP server: ${this.config.name} (${this.config.transport})`) @@ -100,6 +119,15 @@ export class McpClient { this.connectionStatus.connected = true this.connectionStatus.lastConnected = new Date() + if (this.onToolsChanged) { + this.client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { + if (!this.isConnected) return + logger.info(`[${this.config.name}] Received tools/list_changed notification`) + this.onToolsChanged?.(this.config.id) + }) + logger.info(`[${this.config.name}] Registered tools/list_changed notification handler`) + } + const serverVersion = this.client.getServerVersion() logger.info(`Successfully connected to MCP server: ${this.config.name}`, { protocolVersion: serverVersion, @@ -241,6 +269,23 @@ export class McpClient { return !!serverCapabilities?.[capability] } + /** + * Check if the server declared `capabilities.tools.listChanged: true` during initialization. + */ + hasListChangedCapability(): boolean { + const caps = this.client.getServerCapabilities() + const toolsCap = caps?.tools as Record | undefined + return !!toolsCap?.listChanged + } + + /** + * Register a callback to be invoked when the underlying transport closes. + * Used by the connection manager for reconnection logic. + */ + onClose(callback: () => void): void { + this.transport.onclose = callback + } + /** * Get server configuration */ diff --git a/apps/sim/lib/mcp/connection-manager.test.ts b/apps/sim/lib/mcp/connection-manager.test.ts new file mode 100644 index 0000000000..8a8d6bd1fc --- /dev/null +++ b/apps/sim/lib/mcp/connection-manager.test.ts @@ -0,0 +1,184 @@ +/** + * @vitest-environment node + */ +import { loggerMock } from '@sim/testing' +import { afterEach, describe, expect, it, vi } from 'vitest' + +interface MockMcpClient { + connect: ReturnType + disconnect: ReturnType + hasListChangedCapability: ReturnType + onClose: ReturnType +} + +/** Deferred promise to control when `client.connect()` resolves. */ +function createDeferred() { + let resolve!: (value: T) => void + const promise = new Promise((res) => { + resolve = res + }) + return { promise, resolve } +} + +function serverConfig(id: string, name = `Server ${id}`) { + return { + id, + name, + transport: 'streamable-http' as const, + url: `https://${id}.example.com/mcp`, + } +} + +/** Shared setup: resets modules and applies base mocks. */ +function setupBaseMocks() { + vi.resetModules() + vi.doMock('@sim/logger', () => loggerMock) + vi.doMock('@/lib/core/config/feature-flags', () => ({ isTest: false })) + vi.doMock('@/lib/mcp/pubsub', () => ({ + mcpPubSub: { onToolsChanged: vi.fn(() => vi.fn()), publishToolsChanged: vi.fn() }, + })) +} + +describe('McpConnectionManager', () => { + let manager: { connect: Function; dispose: Function } | null = null + + afterEach(() => { + manager?.dispose() + manager = null + }) + + describe('concurrent connect() guard', () => { + it('creates only one client when two connect() calls race for the same serverId', async () => { + setupBaseMocks() + + const deferred = createDeferred() + const instances: MockMcpClient[] = [] + + vi.doMock('./client', () => ({ + McpClient: vi.fn().mockImplementation(() => { + const instance: MockMcpClient = { + connect: vi.fn().mockImplementation(() => deferred.promise), + disconnect: vi.fn().mockResolvedValue(undefined), + hasListChangedCapability: vi.fn().mockReturnValue(true), + onClose: vi.fn(), + } + instances.push(instance) + return instance + }), + })) + + const { mcpConnectionManager: mgr } = await import('./connection-manager') + manager = mgr + + const config = serverConfig('server-1') + + // Fire two concurrent connect() calls for the same server + const p1 = mgr.connect(config, 'user-1', 'ws-1') + const p2 = mgr.connect(config, 'user-1', 'ws-1') + + deferred.resolve() + const [r1, r2] = await Promise.all([p1, p2]) + + // Only one McpClient should have been instantiated + expect(instances).toHaveLength(1) + expect(r1.supportsListChanged).toBe(true) + // Second call hits the connectingServers guard and returns false + expect(r2.supportsListChanged).toBe(false) + }) + + it('allows a new connect() after a previous one completes', async () => { + setupBaseMocks() + + const instances: MockMcpClient[] = [] + + vi.doMock('./client', () => ({ + McpClient: vi.fn().mockImplementation(() => { + const instance: MockMcpClient = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + hasListChangedCapability: vi.fn().mockReturnValue(false), + onClose: vi.fn(), + } + instances.push(instance) + return instance + }), + })) + + const { mcpConnectionManager: mgr } = await import('./connection-manager') + manager = mgr + + const config = serverConfig('server-2') + + // First connect — server doesn't support listChanged, disconnects immediately + const r1 = await mgr.connect(config, 'user-1', 'ws-1') + expect(r1.supportsListChanged).toBe(false) + + // connectingServers cleaned up via finally, so second connect proceeds + const r2 = await mgr.connect(config, 'user-1', 'ws-1') + expect(r2.supportsListChanged).toBe(false) + + expect(instances).toHaveLength(2) + }) + + it('cleans up connectingServers when connect() throws', async () => { + setupBaseMocks() + + let callCount = 0 + const instances: MockMcpClient[] = [] + + vi.doMock('./client', () => ({ + McpClient: vi.fn().mockImplementation(() => { + callCount++ + const instance: MockMcpClient = { + connect: + callCount === 1 + ? vi.fn().mockRejectedValue(new Error('Connection refused')) + : vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + hasListChangedCapability: vi.fn().mockReturnValue(true), + onClose: vi.fn(), + } + instances.push(instance) + return instance + }), + })) + + const { mcpConnectionManager: mgr } = await import('./connection-manager') + manager = mgr + + const config = serverConfig('server-3') + + // First connect fails + const r1 = await mgr.connect(config, 'user-1', 'ws-1') + expect(r1.supportsListChanged).toBe(false) + + // Second connect should NOT be blocked by a stale connectingServers entry + const r2 = await mgr.connect(config, 'user-1', 'ws-1') + expect(r2.supportsListChanged).toBe(true) + expect(instances).toHaveLength(2) + }) + }) + + describe('dispose', () => { + it('rejects new connections after dispose', async () => { + setupBaseMocks() + + vi.doMock('./client', () => ({ + McpClient: vi.fn().mockImplementation(() => ({ + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + hasListChangedCapability: vi.fn().mockReturnValue(true), + onClose: vi.fn(), + })), + })) + + const { mcpConnectionManager: mgr } = await import('./connection-manager') + manager = mgr + + mgr.dispose() + + const result = await mgr.connect(serverConfig('server-4'), 'user-1', 'ws-1') + expect(result.supportsListChanged).toBe(false) + }) + }) +}) diff --git a/apps/sim/lib/mcp/connection-manager.ts b/apps/sim/lib/mcp/connection-manager.ts new file mode 100644 index 0000000000..08635f4b4a --- /dev/null +++ b/apps/sim/lib/mcp/connection-manager.ts @@ -0,0 +1,361 @@ +/** + * MCP Connection Manager + * + * Maintains persistent connections to MCP servers that support + * `notifications/tools/list_changed`. When a notification arrives, + * the manager invalidates the tools cache and emits a ToolsChangedEvent + * so the frontend SSE endpoint can push updates to browsers. + * + * Servers that do not support `listChanged` fall back to the existing + * stale-time cache approach — no persistent connection is kept. + */ + +import { createLogger } from '@sim/logger' +import { isTest } from '@/lib/core/config/feature-flags' +import { McpClient } from '@/lib/mcp/client' +import { mcpPubSub } from '@/lib/mcp/pubsub' +import type { + ManagedConnectionState, + McpServerConfig, + McpToolsChangedCallback, + ToolsChangedEvent, +} from '@/lib/mcp/types' + +const logger = createLogger('McpConnectionManager') + +const MAX_CONNECTIONS = 50 +const MAX_RECONNECT_ATTEMPTS = 10 +const BASE_RECONNECT_DELAY_MS = 1000 +const IDLE_TIMEOUT_MS = 30 * 60 * 1000 // 30 minutes +const IDLE_CHECK_INTERVAL_MS = 5 * 60 * 1000 // 5 minutes + +type ToolsChangedListener = (event: ToolsChangedEvent) => void + +class McpConnectionManager { + private connections = new Map() + private states = new Map() + private reconnectTimers = new Map>() + private listeners = new Set() + private connectingServers = new Set() + private idleCheckTimer: ReturnType | null = null + private disposed = false + private unsubscribePubSub?: () => void + + constructor() { + if (mcpPubSub) { + this.unsubscribePubSub = mcpPubSub.onToolsChanged((event) => { + this.notifyLocalListeners(event) + }) + } + } + + /** + * Subscribe to tools-changed events from any managed connection. + * Returns an unsubscribe function. + */ + subscribe(listener: ToolsChangedListener): () => void { + this.listeners.add(listener) + return () => { + this.listeners.delete(listener) + } + } + + /** + * Establish a persistent connection to an MCP server. + * If the server supports `listChanged`, the connection is kept alive + * and notifications are forwarded to subscribers. + * + * If the server does NOT support `listChanged`, the client is disconnected + * immediately — there's nothing to listen for. + */ + async connect( + config: McpServerConfig, + userId: string, + workspaceId: string + ): Promise<{ supportsListChanged: boolean }> { + if (this.disposed) { + logger.warn('Connection manager is disposed, ignoring connect request') + return { supportsListChanged: false } + } + + const serverId = config.id + + if (this.connections.has(serverId) || this.connectingServers.has(serverId)) { + logger.info(`[${config.name}] Already has a managed connection or is connecting, skipping`) + const state = this.states.get(serverId) + return { supportsListChanged: state?.supportsListChanged ?? false } + } + + if (this.connections.size >= MAX_CONNECTIONS) { + logger.warn(`Max connections (${MAX_CONNECTIONS}) reached, cannot connect to ${config.name}`) + return { supportsListChanged: false } + } + + this.connectingServers.add(serverId) + + try { + const onToolsChanged: McpToolsChangedCallback = (sid) => { + this.handleToolsChanged(sid) + } + + const client = new McpClient({ + config, + securityPolicy: { + requireConsent: false, + auditLevel: 'basic', + maxToolExecutionsPerHour: 1000, + }, + onToolsChanged, + }) + + try { + await client.connect() + } catch (error) { + logger.error(`[${config.name}] Failed to connect for persistent monitoring:`, error) + return { supportsListChanged: false } + } + + const supportsListChanged = client.hasListChangedCapability() + + if (!supportsListChanged) { + logger.info( + `[${config.name}] Server does not support listChanged — disconnecting (fallback to cache)` + ) + await client.disconnect() + return { supportsListChanged: false } + } + + this.connections.set(serverId, client) + this.states.set(serverId, { + serverId, + serverName: config.name, + workspaceId, + userId, + connected: true, + supportsListChanged: true, + reconnectAttempts: 0, + lastActivity: Date.now(), + }) + + client.onClose(() => { + this.handleDisconnect(config, userId, workspaceId) + }) + + this.ensureIdleCheck() + + logger.info(`[${config.name}] Persistent connection established (listChanged supported)`) + return { supportsListChanged: true } + } finally { + this.connectingServers.delete(serverId) + } + } + + /** + * Disconnect a managed connection. + */ + async disconnect(serverId: string): Promise { + this.clearReconnectTimer(serverId) + + const client = this.connections.get(serverId) + if (client) { + try { + await client.disconnect() + } catch (error) { + logger.warn(`Error disconnecting managed client ${serverId}:`, error) + } + this.connections.delete(serverId) + } + + this.states.delete(serverId) + logger.info(`Managed connection removed: ${serverId}`) + } + + /** + * Check whether a managed connection exists for the given server. + */ + hasConnection(serverId: string): boolean { + return this.connections.has(serverId) + } + + /** + * Get connection state for a server. + */ + getState(serverId: string): ManagedConnectionState | undefined { + return this.states.get(serverId) + } + + /** + * Get all managed connection states (for diagnostics). + */ + getAllStates(): ManagedConnectionState[] { + return [...this.states.values()] + } + + /** + * Dispose all connections and timers. + */ + dispose(): void { + this.disposed = true + + this.unsubscribePubSub?.() + + for (const timer of this.reconnectTimers.values()) { + clearTimeout(timer) + } + this.reconnectTimers.clear() + + if (this.idleCheckTimer) { + clearInterval(this.idleCheckTimer) + this.idleCheckTimer = null + } + + const disconnects = [...this.connections.entries()].map(async ([id, client]) => { + try { + await client.disconnect() + } catch (error) { + logger.warn(`Error disconnecting ${id} during dispose:`, error) + } + }) + + Promise.allSettled(disconnects).then(() => { + logger.info('Connection manager disposed') + }) + + this.connections.clear() + this.states.clear() + this.listeners.clear() + this.connectingServers.clear() + } + + /** + * Notify only process-local listeners. + * Called by the pub/sub subscription (receives events from all processes). + */ + private notifyLocalListeners(event: ToolsChangedEvent): void { + for (const listener of this.listeners) { + try { + listener(event) + } catch (error) { + logger.error('Error in tools-changed listener:', error) + } + } + } + + /** + * Handle a tools/list_changed notification from an external MCP server. + * Publishes to pub/sub so all processes are notified. + */ + private handleToolsChanged(serverId: string): void { + const state = this.states.get(serverId) + if (!state) return + + state.lastActivity = Date.now() + + const event: ToolsChangedEvent = { + serverId, + serverName: state.serverName, + workspaceId: state.workspaceId, + timestamp: Date.now(), + } + + logger.info(`[${state.serverName}] Tools changed — publishing to pub/sub`) + + mcpPubSub?.publishToolsChanged(event) + } + + private handleDisconnect(config: McpServerConfig, userId: string, workspaceId: string): void { + const serverId = config.id + const state = this.states.get(serverId) + + if (!state || this.disposed) return + + state.connected = false + this.connections.delete(serverId) + + logger.warn(`[${config.name}] Persistent connection lost, scheduling reconnect`) + + this.scheduleReconnect(config, userId, workspaceId) + } + + private scheduleReconnect(config: McpServerConfig, userId: string, workspaceId: string): void { + const serverId = config.id + const state = this.states.get(serverId) + + if (!state || this.disposed) return + + if (state.reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { + logger.error( + `[${config.name}] Max reconnect attempts (${MAX_RECONNECT_ATTEMPTS}) reached — giving up` + ) + this.states.delete(serverId) + return + } + + const delay = Math.min(BASE_RECONNECT_DELAY_MS * 2 ** state.reconnectAttempts, 60_000) + state.reconnectAttempts++ + + logger.info( + `[${config.name}] Reconnecting in ${delay}ms (attempt ${state.reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS})` + ) + + this.clearReconnectTimer(serverId) + + const timer = setTimeout(async () => { + this.reconnectTimers.delete(serverId) + + if (this.disposed) return + + try { + this.connections.delete(serverId) + this.states.delete(serverId) + + const result = await this.connect(config, userId, workspaceId) + if (result.supportsListChanged) { + const newState = this.states.get(serverId) + if (newState) { + newState.reconnectAttempts = 0 + } + logger.info(`[${config.name}] Reconnected successfully`) + } + } catch (error) { + logger.error(`[${config.name}] Reconnect failed:`, error) + this.scheduleReconnect(config, userId, workspaceId) + } + }, delay) + + this.reconnectTimers.set(serverId, timer) + } + + private clearReconnectTimer(serverId: string): void { + const timer = this.reconnectTimers.get(serverId) + if (timer) { + clearTimeout(timer) + this.reconnectTimers.delete(serverId) + } + } + + private ensureIdleCheck(): void { + if (this.idleCheckTimer) return + + this.idleCheckTimer = setInterval(() => { + const now = Date.now() + for (const [serverId, state] of this.states) { + if (now - state.lastActivity > IDLE_TIMEOUT_MS) { + logger.info( + `[${state.serverName}] Idle timeout reached, disconnecting managed connection` + ) + this.disconnect(serverId) + } + } + + if (this.states.size === 0 && this.idleCheckTimer) { + clearInterval(this.idleCheckTimer) + this.idleCheckTimer = null + } + }, IDLE_CHECK_INTERVAL_MS) + } +} + +export const mcpConnectionManager = isTest + ? (null as unknown as McpConnectionManager) + : new McpConnectionManager() diff --git a/apps/sim/lib/mcp/pubsub.test.ts b/apps/sim/lib/mcp/pubsub.test.ts new file mode 100644 index 0000000000..7f7373e3e0 --- /dev/null +++ b/apps/sim/lib/mcp/pubsub.test.ts @@ -0,0 +1,93 @@ +/** + * @vitest-environment node + */ +import { createMockRedis, loggerMock, type MockRedis } from '@sim/testing' +import { describe, expect, it, vi } from 'vitest' + +/** Extend the @sim/testing Redis mock with the methods RedisMcpPubSub uses. */ +function createPubSubRedis(): MockRedis & { removeAllListeners: ReturnType } { + const mock = createMockRedis() + // ioredis subscribe invokes a callback as the last argument + mock.subscribe.mockImplementation((...args: unknown[]) => { + const cb = args[args.length - 1] + if (typeof cb === 'function') (cb as (err: null) => void)(null) + }) + // on() returns `this` for chaining in ioredis + mock.on.mockReturnThis() + return { ...mock, removeAllListeners: vi.fn().mockReturnThis() } +} + +/** Shared setup: resets modules and applies base mocks. Returns the two Redis instances. */ +async function setupPubSub() { + const instances: ReturnType[] = [] + + vi.resetModules() + vi.doMock('@sim/logger', () => loggerMock) + vi.doMock('@/lib/core/config/env', () => ({ env: { REDIS_URL: 'redis://localhost:6379' } })) + vi.doMock('ioredis', () => ({ + default: vi.fn().mockImplementation(() => { + const instance = createPubSubRedis() + instances.push(instance) + return instance + }), + })) + + const { mcpPubSub } = await import('./pubsub') + const [pub, sub] = instances + + return { mcpPubSub, pub, sub, instances } +} + +describe('RedisMcpPubSub', () => { + it('creates two Redis clients (pub and sub)', async () => { + const { mcpPubSub, instances } = await setupPubSub() + + expect(instances).toHaveLength(2) + mcpPubSub.dispose() + }) + + it('registers error, connect, and message listeners', async () => { + const { mcpPubSub, pub, sub } = await setupPubSub() + + const pubEvents = pub.on.mock.calls.map((c: unknown[]) => c[0]) + const subEvents = sub.on.mock.calls.map((c: unknown[]) => c[0]) + + expect(pubEvents).toContain('error') + expect(pubEvents).toContain('connect') + expect(subEvents).toContain('error') + expect(subEvents).toContain('connect') + expect(subEvents).toContain('message') + + mcpPubSub.dispose() + }) + + describe('dispose', () => { + it('calls removeAllListeners on both pub and sub before quit', async () => { + const { mcpPubSub, pub, sub } = await setupPubSub() + + mcpPubSub.dispose() + + expect(pub.removeAllListeners).toHaveBeenCalledTimes(1) + expect(sub.removeAllListeners).toHaveBeenCalledTimes(1) + expect(sub.unsubscribe).toHaveBeenCalledTimes(1) + expect(pub.quit).toHaveBeenCalledTimes(1) + expect(sub.quit).toHaveBeenCalledTimes(1) + }) + + it('drops publish calls after dispose', async () => { + const { mcpPubSub, pub } = await setupPubSub() + + mcpPubSub.dispose() + pub.publish.mockClear() + + mcpPubSub.publishToolsChanged({ + serverId: 'srv-1', + serverName: 'Test', + workspaceId: 'ws-1', + timestamp: Date.now(), + }) + + expect(pub.publish).not.toHaveBeenCalled() + }) + }) +}) diff --git a/apps/sim/lib/mcp/pubsub.ts b/apps/sim/lib/mcp/pubsub.ts new file mode 100644 index 0000000000..b7e6097c67 --- /dev/null +++ b/apps/sim/lib/mcp/pubsub.ts @@ -0,0 +1,209 @@ +/** + * MCP Pub/Sub Adapter + * + * Broadcasts MCP notification events across processes using Redis Pub/Sub. + * Gracefully falls back to process-local EventEmitter when Redis is unavailable. + * + * Two channels: + * - `mcp:tools_changed` — external MCP server sent a listChanged notification + * (published by connection manager, consumed by events SSE endpoint) + * - `mcp:workflow_tools_changed` — workflow CRUD modified a workflow MCP server's tools + * (published by serve route, consumed by serve route on other processes to push to local SSE clients) + */ + +import { EventEmitter } from 'events' +import { createLogger } from '@sim/logger' +import Redis from 'ioredis' +import { env } from '@/lib/core/config/env' +import type { ToolsChangedEvent } from '@/lib/mcp/types' + +const logger = createLogger('McpPubSub') + +const CHANNEL_TOOLS_CHANGED = 'mcp:tools_changed' +const CHANNEL_WORKFLOW_TOOLS_CHANGED = 'mcp:workflow_tools_changed' + +export interface WorkflowToolsChangedEvent { + serverId: string + workspaceId: string +} + +type ToolsChangedHandler = (event: ToolsChangedEvent) => void +type WorkflowToolsChangedHandler = (event: WorkflowToolsChangedEvent) => void + +interface McpPubSubAdapter { + publishToolsChanged(event: ToolsChangedEvent): void + publishWorkflowToolsChanged(event: WorkflowToolsChangedEvent): void + onToolsChanged(handler: ToolsChangedHandler): () => void + onWorkflowToolsChanged(handler: WorkflowToolsChangedHandler): () => void + dispose(): void +} + +/** + * Redis-backed pub/sub adapter. + * Uses dedicated pub and sub clients (ioredis requires separate connections for subscribers). + */ +class RedisMcpPubSub implements McpPubSubAdapter { + private pub: Redis + private sub: Redis + private toolsChangedHandlers = new Set() + private workflowToolsChangedHandlers = new Set() + private disposed = false + + constructor(redisUrl: string) { + const commonOpts = { + keepAlive: 1000, + connectTimeout: 10000, + maxRetriesPerRequest: null as unknown as number, + enableOfflineQueue: true, + retryStrategy: (times: number) => { + if (times > 10) return 30000 + return Math.min(times * 500, 5000) + }, + } + + this.pub = new Redis(redisUrl, { ...commonOpts, connectionName: 'mcp-pubsub-pub' }) + this.sub = new Redis(redisUrl, { ...commonOpts, connectionName: 'mcp-pubsub-sub' }) + + this.pub.on('error', (err) => logger.error('MCP pub/sub publish client error:', err.message)) + this.sub.on('error', (err) => logger.error('MCP pub/sub subscribe client error:', err.message)) + this.pub.on('connect', () => logger.info('MCP pub/sub publish client connected')) + this.sub.on('connect', () => logger.info('MCP pub/sub subscribe client connected')) + + this.sub.subscribe(CHANNEL_TOOLS_CHANGED, CHANNEL_WORKFLOW_TOOLS_CHANGED, (err) => { + if (err) { + logger.error('Failed to subscribe to MCP pub/sub channels:', err) + } else { + logger.info('Subscribed to MCP pub/sub channels') + } + }) + + this.sub.on('message', (channel: string, message: string) => { + try { + const parsed = JSON.parse(message) + if (channel === CHANNEL_TOOLS_CHANGED) { + for (const handler of this.toolsChangedHandlers) { + try { + handler(parsed as ToolsChangedEvent) + } catch (err) { + logger.error('Error in tools_changed handler:', err) + } + } + } else if (channel === CHANNEL_WORKFLOW_TOOLS_CHANGED) { + for (const handler of this.workflowToolsChangedHandlers) { + try { + handler(parsed as WorkflowToolsChangedEvent) + } catch (err) { + logger.error('Error in workflow_tools_changed handler:', err) + } + } + } + } catch (err) { + logger.error('Failed to parse pub/sub message:', err) + } + }) + } + + publishToolsChanged(event: ToolsChangedEvent): void { + if (this.disposed) return + this.pub.publish(CHANNEL_TOOLS_CHANGED, JSON.stringify(event)).catch((err) => { + logger.error('Failed to publish tools_changed:', err) + }) + } + + publishWorkflowToolsChanged(event: WorkflowToolsChangedEvent): void { + if (this.disposed) return + this.pub.publish(CHANNEL_WORKFLOW_TOOLS_CHANGED, JSON.stringify(event)).catch((err) => { + logger.error('Failed to publish workflow_tools_changed:', err) + }) + } + + onToolsChanged(handler: ToolsChangedHandler): () => void { + this.toolsChangedHandlers.add(handler) + return () => { + this.toolsChangedHandlers.delete(handler) + } + } + + onWorkflowToolsChanged(handler: WorkflowToolsChangedHandler): () => void { + this.workflowToolsChangedHandlers.add(handler) + return () => { + this.workflowToolsChangedHandlers.delete(handler) + } + } + + dispose(): void { + this.disposed = true + this.toolsChangedHandlers.clear() + this.workflowToolsChangedHandlers.clear() + + this.pub.removeAllListeners() + this.sub.removeAllListeners() + + this.sub.unsubscribe().catch(() => {}) + this.pub.quit().catch(() => {}) + this.sub.quit().catch(() => {}) + logger.info('Redis MCP pub/sub disposed') + } +} + +/** + * Process-local fallback using EventEmitter. + * Used when Redis is not configured — notifications only reach listeners in the same process. + */ +class LocalMcpPubSub implements McpPubSubAdapter { + private emitter = new EventEmitter() + + constructor() { + this.emitter.setMaxListeners(100) + logger.info('MCP pub/sub: Using process-local EventEmitter (Redis not configured)') + } + + publishToolsChanged(event: ToolsChangedEvent): void { + this.emitter.emit(CHANNEL_TOOLS_CHANGED, event) + } + + publishWorkflowToolsChanged(event: WorkflowToolsChangedEvent): void { + this.emitter.emit(CHANNEL_WORKFLOW_TOOLS_CHANGED, event) + } + + onToolsChanged(handler: ToolsChangedHandler): () => void { + this.emitter.on(CHANNEL_TOOLS_CHANGED, handler) + return () => { + this.emitter.off(CHANNEL_TOOLS_CHANGED, handler) + } + } + + onWorkflowToolsChanged(handler: WorkflowToolsChangedHandler): () => void { + this.emitter.on(CHANNEL_WORKFLOW_TOOLS_CHANGED, handler) + return () => { + this.emitter.off(CHANNEL_WORKFLOW_TOOLS_CHANGED, handler) + } + } + + dispose(): void { + this.emitter.removeAllListeners() + logger.info('Local MCP pub/sub disposed') + } +} + +/** + * Create the appropriate pub/sub adapter based on Redis availability. + */ +function createMcpPubSub(): McpPubSubAdapter { + const redisUrl = env.REDIS_URL + + if (redisUrl) { + try { + logger.info('MCP pub/sub: Using Redis') + return new RedisMcpPubSub(redisUrl) + } catch (err) { + logger.error('Failed to create Redis pub/sub, falling back to local:', err) + return new LocalMcpPubSub() + } + } + + return new LocalMcpPubSub() +} + +export const mcpPubSub: McpPubSubAdapter = + typeof window !== 'undefined' ? (null as unknown as McpPubSubAdapter) : createMcpPubSub() diff --git a/apps/sim/lib/mcp/types.ts b/apps/sim/lib/mcp/types.ts index f9e7948f04..a0a2848df9 100644 --- a/apps/sim/lib/mcp/types.ts +++ b/apps/sim/lib/mcp/types.ts @@ -147,6 +147,44 @@ export interface McpServerSummary { error?: string } +/** + * Callback invoked when an MCP server sends a `notifications/tools/list_changed` notification. + */ +export type McpToolsChangedCallback = (serverId: string) => void + +/** + * Options for creating an McpClient with notification support. + */ +export interface McpClientOptions { + config: McpServerConfig + securityPolicy?: McpSecurityPolicy + onToolsChanged?: McpToolsChangedCallback +} + +/** + * Event emitted by the connection manager when a server's tools change. + */ +export interface ToolsChangedEvent { + serverId: string + serverName: string + workspaceId: string + timestamp: number +} + +/** + * State of a managed persistent connection. + */ +export interface ManagedConnectionState { + serverId: string + serverName: string + workspaceId: string + userId: string + connected: boolean + supportsListChanged: boolean + reconnectAttempts: number + lastActivity: number +} + export interface McpApiResponse { success: boolean data?: T From 7c6927d6437ccfb6528815d50366836e7ae0a017 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 09:59:51 -0800 Subject: [PATCH 2/7] updated tests --- apps/sim/lib/mcp/client.test.ts | 2 -- apps/sim/lib/mcp/connection-manager.test.ts | 12 ++++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/apps/sim/lib/mcp/client.test.ts b/apps/sim/lib/mcp/client.test.ts index 080ca391cf..386769d41e 100644 --- a/apps/sim/lib/mcp/client.test.ts +++ b/apps/sim/lib/mcp/client.test.ts @@ -91,8 +91,6 @@ describe('McpClient notification handler', () => { expect(capturedNotificationHandler).not.toBeNull() await client.disconnect() - - // Simulate a late notification arriving after disconnect await capturedNotificationHandler!() expect(onToolsChanged).not.toHaveBeenCalled() diff --git a/apps/sim/lib/mcp/connection-manager.test.ts b/apps/sim/lib/mcp/connection-manager.test.ts index 8a8d6bd1fc..4badbdde55 100644 --- a/apps/sim/lib/mcp/connection-manager.test.ts +++ b/apps/sim/lib/mcp/connection-manager.test.ts @@ -40,7 +40,10 @@ function setupBaseMocks() { } describe('McpConnectionManager', () => { - let manager: { connect: Function; dispose: Function } | null = null + let manager: { + connect: (...args: unknown[]) => Promise<{ supportsListChanged: boolean }> + dispose: () => void + } | null = null afterEach(() => { manager?.dispose() @@ -72,17 +75,14 @@ describe('McpConnectionManager', () => { const config = serverConfig('server-1') - // Fire two concurrent connect() calls for the same server const p1 = mgr.connect(config, 'user-1', 'ws-1') const p2 = mgr.connect(config, 'user-1', 'ws-1') deferred.resolve() const [r1, r2] = await Promise.all([p1, p2]) - // Only one McpClient should have been instantiated expect(instances).toHaveLength(1) expect(r1.supportsListChanged).toBe(true) - // Second call hits the connectingServers guard and returns false expect(r2.supportsListChanged).toBe(false) }) @@ -109,11 +109,9 @@ describe('McpConnectionManager', () => { const config = serverConfig('server-2') - // First connect — server doesn't support listChanged, disconnects immediately const r1 = await mgr.connect(config, 'user-1', 'ws-1') expect(r1.supportsListChanged).toBe(false) - // connectingServers cleaned up via finally, so second connect proceeds const r2 = await mgr.connect(config, 'user-1', 'ws-1') expect(r2.supportsListChanged).toBe(false) @@ -148,11 +146,9 @@ describe('McpConnectionManager', () => { const config = serverConfig('server-3') - // First connect fails const r1 = await mgr.connect(config, 'user-1', 'ws-1') expect(r1.supportsListChanged).toBe(false) - // Second connect should NOT be blocked by a stale connectingServers entry const r2 = await mgr.connect(config, 'user-1', 'ws-1') expect(r2.supportsListChanged).toBe(true) expect(instances).toHaveLength(2) From f844e2570fcda64db783df62ad36b4a878115786 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 10:48:43 -0800 Subject: [PATCH 3/7] plugged in new mcp event based system and create sse route to publish notifs --- apps/sim/app/api/mcp/events/route.test.ts | 98 ++++++++++++++++ apps/sim/app/api/mcp/events/route.ts | 111 ++++++++++++++++++ .../api/mcp/workflow-servers/[id]/route.ts | 3 + .../[id]/tools/[toolId]/route.ts | 5 + .../mcp/workflow-servers/[id]/tools/route.ts | 3 + .../sim/app/api/mcp/workflow-servers/route.ts | 5 + .../components/tool-input/tool-input.tsx | 8 +- apps/sim/hooks/queries/mcp.ts | 63 ++++++++++ apps/sim/lib/mcp/pubsub.ts | 7 +- apps/sim/lib/mcp/service.ts | 26 +++- apps/sim/lib/mcp/types.ts | 8 ++ 11 files changed, 329 insertions(+), 8 deletions(-) create mode 100644 apps/sim/app/api/mcp/events/route.test.ts create mode 100644 apps/sim/app/api/mcp/events/route.ts diff --git a/apps/sim/app/api/mcp/events/route.test.ts b/apps/sim/app/api/mcp/events/route.test.ts new file mode 100644 index 0000000000..f3db4d5754 --- /dev/null +++ b/apps/sim/app/api/mcp/events/route.test.ts @@ -0,0 +1,98 @@ +/** + * Tests for MCP SSE events endpoint + * + * @vitest-environment node + */ +import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +mockConsoleLogger() +const auth = mockAuth() + +const mockGetUserEntityPermissions = vi.fn() +vi.doMock('@/lib/workspaces/permissions/utils', () => ({ + getUserEntityPermissions: mockGetUserEntityPermissions, +})) + +vi.doMock('@/lib/mcp/connection-manager', () => ({ + mcpConnectionManager: null, +})) + +vi.doMock('@/lib/mcp/pubsub', () => ({ + mcpPubSub: null, +})) + +const { GET } = await import('./route') + +describe('MCP Events SSE Endpoint', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns 401 when session is missing', async () => { + auth.setUnauthenticated() + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(401) + const text = await response.text() + expect(text).toBe('Unauthorized') + }) + + it('returns 400 when workspaceId is missing', async () => { + auth.setAuthenticated() + + const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events') + + const response = await GET(request as any) + + expect(response.status).toBe(400) + const text = await response.text() + expect(text).toBe('Missing workspaceId query parameter') + }) + + it('returns 403 when user lacks workspace access', async () => { + auth.setAuthenticated() + mockGetUserEntityPermissions.mockResolvedValue(null) + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(403) + const text = await response.text() + expect(text).toBe('Access denied to workspace') + expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123') + }) + + it('returns SSE stream when authorized', async () => { + auth.setAuthenticated() + mockGetUserEntityPermissions.mockResolvedValue({ read: true }) + + const request = createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/mcp/events?workspaceId=ws-123' + ) + + const response = await GET(request as any) + + expect(response.status).toBe(200) + expect(response.headers.get('Content-Type')).toBe('text/event-stream') + expect(response.headers.get('Cache-Control')).toBe('no-cache') + expect(response.headers.get('Connection')).toBe('keep-alive') + }) +}) diff --git a/apps/sim/app/api/mcp/events/route.ts b/apps/sim/app/api/mcp/events/route.ts new file mode 100644 index 0000000000..6df91db5c0 --- /dev/null +++ b/apps/sim/app/api/mcp/events/route.ts @@ -0,0 +1,111 @@ +/** + * SSE endpoint for MCP tool-change events. + * + * Pushes `tools_changed` events to the browser when: + * - An external MCP server sends `notifications/tools/list_changed` (via connection manager) + * - A workflow CRUD route modifies workflow MCP server tools (via pub/sub) + * + * Auth is handled via session cookies (EventSource sends cookies automatically). + */ + +import { createLogger } from '@sim/logger' +import type { NextRequest } from 'next/server' +import { getSession } from '@/lib/auth' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { mcpConnectionManager } from '@/lib/mcp/connection-manager' +import { mcpPubSub } from '@/lib/mcp/pubsub' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('McpEventsSSE') + +export const dynamic = 'force-dynamic' + +const HEARTBEAT_INTERVAL_MS = 30_000 + +export async function GET(request: NextRequest) { + const session = await getSession() + if (!session?.user?.id) { + return new Response('Unauthorized', { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const workspaceId = searchParams.get('workspaceId') + if (!workspaceId) { + return new Response('Missing workspaceId query parameter', { status: 400 }) + } + + const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId) + if (!permissions) { + return new Response('Access denied to workspace', { status: 403 }) + } + + const encoder = new TextEncoder() + const unsubscribers: Array<() => void> = [] + + const stream = new ReadableStream({ + start(controller) { + const send = (eventName: string, data: Record) => { + try { + controller.enqueue( + encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`) + ) + } catch { + // Stream already closed + } + } + + // Subscribe to external MCP server tool changes + if (mcpConnectionManager) { + const unsub = mcpConnectionManager.subscribe((event) => { + if (event.workspaceId !== workspaceId) return + send('tools_changed', { + source: 'external', + serverId: event.serverId, + timestamp: event.timestamp, + }) + }) + unsubscribers.push(unsub) + } + + // Subscribe to workflow CRUD tool changes + if (mcpPubSub) { + const unsub = mcpPubSub.onWorkflowToolsChanged((event) => { + if (event.workspaceId !== workspaceId) return + send('tools_changed', { + source: 'workflow', + serverId: event.serverId, + timestamp: Date.now(), + }) + }) + unsubscribers.push(unsub) + } + + // Heartbeat to keep the connection alive + const heartbeat = setInterval(() => { + try { + controller.enqueue(encoder.encode(': heartbeat\n\n')) + } catch { + clearInterval(heartbeat) + } + }, HEARTBEAT_INTERVAL_MS) + unsubscribers.push(() => clearInterval(heartbeat)) + + // Cleanup when client disconnects + request.signal.addEventListener('abort', () => { + for (const unsub of unsubscribers) { + unsub() + } + try { + controller.close() + } catch { + // Already closed + } + logger.info(`SSE connection closed for workspace ${workspaceId}`) + }) + + logger.info(`SSE connection opened for workspace ${workspaceId}`) + }, + }) + + return new Response(stream, { headers: SSE_HEADERS }) +} diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts index 3ce0e00455..e0a1f085eb 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' const logger = createLogger('WorkflowMcpServerAPI') @@ -146,6 +147,8 @@ export const DELETE = withMcpAuth('admin')( logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` }) } catch (error) { logger.error(`[${requestId}] Error deleting workflow MCP server:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts index d7fd532590..87113b868a 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' @@ -115,6 +116,8 @@ export const PATCH = withMcpAuth('write')( logger.info(`[${requestId}] Successfully updated tool ${toolId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ tool: updatedTool }) } catch (error) { logger.error(`[${requestId}] Error updating tool:`, error) @@ -160,6 +163,8 @@ export const DELETE = withMcpAuth('write')( logger.info(`[${requestId}] Successfully deleted tool ${toolId}`) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` }) } catch (error) { logger.error(`[${requestId}] Error deleting tool:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts index b2cef8ee5b..6705d52989 100644 --- a/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' @@ -188,6 +189,8 @@ export const POST = withMcpAuth('write')( `[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}` ) + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + return createMcpSuccessResponse({ tool }, 201) } catch (error) { logger.error(`[${requestId}] Error adding tool:`, error) diff --git a/apps/sim/app/api/mcp/workflow-servers/route.ts b/apps/sim/app/api/mcp/workflow-servers/route.ts index e2900f5a88..1779e51a9b 100644 --- a/apps/sim/app/api/mcp/workflow-servers/route.ts +++ b/apps/sim/app/api/mcp/workflow-servers/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { eq, inArray, sql } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' +import { mcpPubSub } from '@/lib/mcp/pubsub' import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils' import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' @@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')( `[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`, addedTools.map((t) => t.toolName) ) + + if (addedTools.length > 0) { + mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId }) + } } logger.info( diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx index 8f03f4b2e5..9990c3eebb 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx @@ -62,7 +62,12 @@ import { type CustomTool as CustomToolDefinition, useCustomTools, } from '@/hooks/queries/custom-tools' -import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp' +import { + useForceRefreshMcpTools, + useMcpServers, + useMcpToolsEvents, + useStoredMcpTools, +} from '@/hooks/queries/mcp' import { useChildDeploymentStatus, useDeployChildWorkflow, @@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({ const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId) const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId) const forceRefreshMcpTools = useForceRefreshMcpTools() + useMcpToolsEvents(workspaceId) const openSettingsModal = useSettingsModalStore((state) => state.openModal) const mcpDataLoading = mcpLoading || mcpServersLoading diff --git a/apps/sim/hooks/queries/mcp.ts b/apps/sim/hooks/queries/mcp.ts index 5ef4170d36..607cb5e1ec 100644 --- a/apps/sim/hooks/queries/mcp.ts +++ b/apps/sim/hooks/queries/mcp.ts @@ -1,3 +1,4 @@ +import { useEffect } from 'react' import { createLogger } from '@sim/logger' import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared' @@ -359,3 +360,65 @@ export function useStoredMcpTools(workspaceId: string) { staleTime: 60 * 1000, }) } + +/** + * Shared EventSource connections keyed by workspaceId. + * Reference-counted so the connection is closed when the last consumer unmounts. + * Attached to `globalThis` so connections survive HMR in development. + */ +const SSE_KEY = '__mcp_sse_connections' as const + +type SseEntry = { source: EventSource; refs: number } + +const sseConnections: Map = + ((globalThis as Record)[SSE_KEY] as Map) ?? + ((globalThis as Record)[SSE_KEY] = new Map()) + +/** + * Subscribe to MCP tool-change SSE events for a workspace. + * On each `tools_changed` event, invalidates the relevant React Query caches + * so the UI refreshes automatically. + */ +export function useMcpToolsEvents(workspaceId: string) { + const queryClient = useQueryClient() + + useEffect(() => { + if (!workspaceId) return + + const invalidate = () => { + queryClient.invalidateQueries({ queryKey: mcpKeys.tools(workspaceId) }) + queryClient.invalidateQueries({ queryKey: mcpKeys.servers(workspaceId) }) + queryClient.invalidateQueries({ queryKey: mcpKeys.storedTools(workspaceId) }) + } + + let entry = sseConnections.get(workspaceId) + + if (!entry) { + const source = new EventSource(`/api/mcp/events?workspaceId=${workspaceId}`) + + source.addEventListener('tools_changed', () => { + invalidate() + }) + + source.onerror = () => { + logger.warn(`SSE connection error for workspace ${workspaceId}`) + } + + entry = { source, refs: 0 } + sseConnections.set(workspaceId, entry) + } + + entry.refs++ + + return () => { + const current = sseConnections.get(workspaceId) + if (!current) return + + current.refs-- + if (current.refs <= 0) { + current.source.close() + sseConnections.delete(workspaceId) + } + } + }, [workspaceId, queryClient]) +} diff --git a/apps/sim/lib/mcp/pubsub.ts b/apps/sim/lib/mcp/pubsub.ts index b7e6097c67..2db2bb337d 100644 --- a/apps/sim/lib/mcp/pubsub.ts +++ b/apps/sim/lib/mcp/pubsub.ts @@ -15,18 +15,13 @@ import { EventEmitter } from 'events' import { createLogger } from '@sim/logger' import Redis from 'ioredis' import { env } from '@/lib/core/config/env' -import type { ToolsChangedEvent } from '@/lib/mcp/types' +import type { ToolsChangedEvent, WorkflowToolsChangedEvent } from '@/lib/mcp/types' const logger = createLogger('McpPubSub') const CHANNEL_TOOLS_CHANGED = 'mcp:tools_changed' const CHANNEL_WORKFLOW_TOOLS_CHANGED = 'mcp:workflow_tools_changed' -export interface WorkflowToolsChangedEvent { - serverId: string - workspaceId: string -} - type ToolsChangedHandler = (event: ToolsChangedEvent) => void type WorkflowToolsChangedHandler = (event: WorkflowToolsChangedEvent) => void diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 64001b50c6..e38cfb3f02 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -9,6 +9,7 @@ import { and, eq, isNull } from 'drizzle-orm' import { isTest } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { McpClient } from '@/lib/mcp/client' +import { mcpConnectionManager } from '@/lib/mcp/connection-manager' import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config' import { createMcpCacheAdapter, @@ -31,16 +32,24 @@ const logger = createLogger('McpService') class McpService { private cacheAdapter: McpCacheStorageAdapter private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT + private unsubscribeConnectionManager?: () => void constructor() { this.cacheAdapter = createMcpCacheAdapter() logger.info(`MCP Service initialized with ${getMcpCacheType()} cache`) + + if (mcpConnectionManager) { + this.unsubscribeConnectionManager = mcpConnectionManager.subscribe((event) => { + this.clearCache(event.workspaceId) + }) + } } /** * Dispose of the service and cleanup resources */ dispose(): void { + this.unsubscribeConnectionManager?.() this.cacheAdapter.dispose() logger.info('MCP Service disposed') } @@ -328,7 +337,7 @@ class McpService { logger.debug( `[${requestId}] Discovered ${tools.length} tools from server ${config.name}` ) - return { serverId: config.id, tools } + return { serverId: config.id, tools, resolvedConfig } } finally { await client.disconnect() } @@ -364,6 +373,21 @@ class McpService { logger.error(`[${requestId}] Error updating server statuses:`, err) }) + // Fire-and-forget persistent connections for servers that support listChanged + if (mcpConnectionManager) { + for (const [index, result] of results.entries()) { + if (result.status === 'fulfilled') { + const { resolvedConfig } = result.value + mcpConnectionManager.connect(resolvedConfig, userId, workspaceId).catch((err) => { + logger.warn( + `[${requestId}] Persistent connection failed for ${servers[index].name}:`, + err + ) + }) + } + } + } + if (failedCount === 0) { try { await this.cacheAdapter.set(cacheKey, allTools, this.cacheTimeout) diff --git a/apps/sim/lib/mcp/types.ts b/apps/sim/lib/mcp/types.ts index a0a2848df9..b7e0d838ef 100644 --- a/apps/sim/lib/mcp/types.ts +++ b/apps/sim/lib/mcp/types.ts @@ -185,6 +185,14 @@ export interface ManagedConnectionState { lastActivity: number } +/** + * Event emitted when workflow CRUD modifies a workflow MCP server's tools. + */ +export interface WorkflowToolsChangedEvent { + serverId: string + workspaceId: string +} + export interface McpApiResponse { success: boolean data?: T From ec6910306d7dc826d65bfc752549b6916ce0342c Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 11:30:08 -0800 Subject: [PATCH 4/7] ack commetns --- apps/sim/lib/mcp/connection-manager.ts | 38 +++++++++++++++++++++----- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/apps/sim/lib/mcp/connection-manager.ts b/apps/sim/lib/mcp/connection-manager.ts index 08635f4b4a..fefe4a5cb8 100644 --- a/apps/sim/lib/mcp/connection-manager.ts +++ b/apps/sim/lib/mcp/connection-manager.ts @@ -305,20 +305,21 @@ class McpConnectionManager { if (this.disposed) return - try { - this.connections.delete(serverId) - this.states.delete(serverId) + const attempts = state.reconnectAttempts + this.connections.delete(serverId) + this.states.delete(serverId) + try { const result = await this.connect(config, userId, workspaceId) if (result.supportsListChanged) { - const newState = this.states.get(serverId) - if (newState) { - newState.reconnectAttempts = 0 - } logger.info(`[${config.name}] Reconnected successfully`) + } else { + this.restoreReconnectState(config, userId, workspaceId, attempts) + this.scheduleReconnect(config, userId, workspaceId) } } catch (error) { logger.error(`[${config.name}] Reconnect failed:`, error) + this.restoreReconnectState(config, userId, workspaceId, attempts) this.scheduleReconnect(config, userId, workspaceId) } }, delay) @@ -334,6 +335,29 @@ class McpConnectionManager { } } + /** + * Restore minimal state so `scheduleReconnect` can check attempts and continue the retry loop. + */ + private restoreReconnectState( + config: McpServerConfig, + userId: string, + workspaceId: string, + reconnectAttempts: number + ): void { + if (!this.states.has(config.id)) { + this.states.set(config.id, { + serverId: config.id, + serverName: config.name, + workspaceId, + userId, + connected: false, + supportsListChanged: false, + reconnectAttempts, + lastActivity: Date.now(), + }) + } + } + private ensureIdleCheck(): void { if (this.idleCheckTimer) return From 3de6e3b40c4e99aa7f12aeeac4eecd6822e87f69 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 11:52:56 -0800 Subject: [PATCH 5/7] fix reconnect timer --- apps/sim/lib/mcp/connection-manager.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/apps/sim/lib/mcp/connection-manager.ts b/apps/sim/lib/mcp/connection-manager.ts index fefe4a5cb8..d25670e9d9 100644 --- a/apps/sim/lib/mcp/connection-manager.ts +++ b/apps/sim/lib/mcp/connection-manager.ts @@ -125,6 +125,8 @@ class McpConnectionManager { return { supportsListChanged: false } } + this.clearReconnectTimer(serverId) + this.connections.set(serverId, client) this.states.set(serverId, { serverId, @@ -305,6 +307,14 @@ class McpConnectionManager { if (this.disposed) return + const currentState = this.states.get(serverId) + if (currentState?.connected) { + logger.info( + `[${config.name}] Connection already re-established externally, skipping reconnect` + ) + return + } + const attempts = state.reconnectAttempts this.connections.delete(serverId) this.states.delete(serverId) From 4d53e3543916ab333873cb8d62e3b4bc678bf661 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 12:11:02 -0800 Subject: [PATCH 6/7] cleanup when running onClose --- apps/sim/lib/mcp/client.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 02395ed8fc..f26adf33b7 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -281,9 +281,14 @@ export class McpClient { /** * Register a callback to be invoked when the underlying transport closes. * Used by the connection manager for reconnection logic. + * Chains with the SDK's internal onclose handler so it still performs its cleanup. */ onClose(callback: () => void): void { - this.transport.onclose = callback + const existingHandler = this.transport.onclose + this.transport.onclose = () => { + existingHandler?.() + callback() + } } /** From 0ba38ce32c96297a110959b95cc61eec3e06cd38 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 9 Feb 2026 12:20:28 -0800 Subject: [PATCH 7/7] fixed spacing on mcp settings tab --- .../components/settings-modal/components/mcp/mcp.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/mcp/mcp.tsx b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/mcp/mcp.tsx index d25865a743..4295dd59d9 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/mcp/mcp.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/mcp/mcp.tsx @@ -894,14 +894,14 @@ export function MCP({ initialServerId }: MCPProps) { disabled={!hasParams} >
-
-

+

+

{tool.name}

{issues.length > 0 && ( -
+