Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions apps/sim/app/api/mcp/events/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Tests for MCP SSE events endpoint
*
* @vitest-environment node
*/
import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

mockConsoleLogger()
const auth = mockAuth()

const mockGetUserEntityPermissions = vi.fn()
vi.doMock('@/lib/workspaces/permissions/utils', () => ({
getUserEntityPermissions: mockGetUserEntityPermissions,
}))

vi.doMock('@/lib/mcp/connection-manager', () => ({
mcpConnectionManager: null,
}))

vi.doMock('@/lib/mcp/pubsub', () => ({
mcpPubSub: null,
}))

const { GET } = await import('./route')

describe('MCP Events SSE Endpoint', () => {
beforeEach(() => {
vi.clearAllMocks()
})

it('returns 401 when session is missing', async () => {
auth.setUnauthenticated()

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(401)
const text = await response.text()
expect(text).toBe('Unauthorized')
})

it('returns 400 when workspaceId is missing', async () => {
auth.setAuthenticated()

const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events')

const response = await GET(request as any)

expect(response.status).toBe(400)
const text = await response.text()
expect(text).toBe('Missing workspaceId query parameter')
})

it('returns 403 when user lacks workspace access', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue(null)

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(403)
const text = await response.text()
expect(text).toBe('Access denied to workspace')
expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123')
})

it('returns SSE stream when authorized', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue({ read: true })

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(200)
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')
})
})
111 changes: 111 additions & 0 deletions apps/sim/app/api/mcp/events/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* SSE endpoint for MCP tool-change events.
*
* Pushes `tools_changed` events to the browser when:
* - An external MCP server sends `notifications/tools/list_changed` (via connection manager)
* - A workflow CRUD route modifies workflow MCP server tools (via pub/sub)
*
* Auth is handled via session cookies (EventSource sends cookies automatically).
*/

import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server'
import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('McpEventsSSE')

export const dynamic = 'force-dynamic'

const HEARTBEAT_INTERVAL_MS = 30_000

export async function GET(request: NextRequest) {
const session = await getSession()
if (!session?.user?.id) {
return new Response('Unauthorized', { status: 401 })
}

const { searchParams } = new URL(request.url)
const workspaceId = searchParams.get('workspaceId')
if (!workspaceId) {
return new Response('Missing workspaceId query parameter', { status: 400 })
}

const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permissions) {
return new Response('Access denied to workspace', { status: 403 })
}

const encoder = new TextEncoder()
const unsubscribers: Array<() => void> = []

const stream = new ReadableStream({
start(controller) {
const send = (eventName: string, data: Record<string, unknown>) => {
try {
controller.enqueue(
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
)
} catch {
// Stream already closed
}
}

// Subscribe to external MCP server tool changes
if (mcpConnectionManager) {
const unsub = mcpConnectionManager.subscribe((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'external',
serverId: event.serverId,
timestamp: event.timestamp,
})
})
unsubscribers.push(unsub)
}

// Subscribe to workflow CRUD tool changes
if (mcpPubSub) {
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'workflow',
serverId: event.serverId,
timestamp: Date.now(),
})
})
unsubscribers.push(unsub)
}

// Heartbeat to keep the connection alive
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
clearInterval(heartbeat)
}
}, HEARTBEAT_INTERVAL_MS)
unsubscribers.push(() => clearInterval(heartbeat))

// Cleanup when client disconnects
request.signal.addEventListener('abort', () => {
for (const unsub of unsubscribers) {
unsub()
}
try {
controller.close()
} catch {
// Already closed
}
logger.info(`SSE connection closed for workspace ${workspaceId}`)
})

logger.info(`SSE connection opened for workspace ${workspaceId}`)
},
})

return new Response(stream, { headers: SSE_HEADERS })
}
3 changes: 3 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'

const logger = createLogger('WorkflowMcpServerAPI')
Expand Down Expand Up @@ -146,6 +147,8 @@ export const DELETE = withMcpAuth<RouteParams>('admin')(

logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting workflow MCP server:`, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'

Expand Down Expand Up @@ -115,6 +116,8 @@ export const PATCH = withMcpAuth<RouteParams>('write')(

logger.info(`[${requestId}] Successfully updated tool ${toolId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ tool: updatedTool })
} catch (error) {
logger.error(`[${requestId}] Error updating tool:`, error)
Expand Down Expand Up @@ -160,6 +163,8 @@ export const DELETE = withMcpAuth<RouteParams>('write')(

logger.info(`[${requestId}] Successfully deleted tool ${toolId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting tool:`, error)
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
Expand Down Expand Up @@ -188,6 +189,8 @@ export const POST = withMcpAuth<RouteParams>('write')(
`[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}`
)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ tool }, 201)
} catch (error) {
logger.error(`[${requestId}] Error adding tool:`, error)
Expand Down
5 changes: 5 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { eq, inArray, sql } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
Expand Down Expand Up @@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')(
`[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`,
addedTools.map((t) => t.toolName)
)

if (addedTools.length > 0) {
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
}
}

logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ import {
type CustomTool as CustomToolDefinition,
useCustomTools,
} from '@/hooks/queries/custom-tools'
import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp'
import {
useForceRefreshMcpTools,
useMcpServers,
useMcpToolsEvents,
useStoredMcpTools,
} from '@/hooks/queries/mcp'
import {
useChildDeploymentStatus,
useDeployChildWorkflow,
Expand Down Expand Up @@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({
const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId)
const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId)
const forceRefreshMcpTools = useForceRefreshMcpTools()
useMcpToolsEvents(workspaceId)
const openSettingsModal = useSettingsModalStore((state) => state.openModal)
const mcpDataLoading = mcpLoading || mcpServersLoading

Expand Down
63 changes: 63 additions & 0 deletions apps/sim/hooks/queries/mcp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { useEffect } from 'react'
import { createLogger } from '@sim/logger'
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared'
Expand Down Expand Up @@ -359,3 +360,65 @@ export function useStoredMcpTools(workspaceId: string) {
staleTime: 60 * 1000,
})
}

/**
* Shared EventSource connections keyed by workspaceId.
* Reference-counted so the connection is closed when the last consumer unmounts.
* Attached to `globalThis` so connections survive HMR in development.
*/
const SSE_KEY = '__mcp_sse_connections' as const

type SseEntry = { source: EventSource; refs: number }

const sseConnections: Map<string, SseEntry> =
((globalThis as Record<string, unknown>)[SSE_KEY] as Map<string, SseEntry>) ??
((globalThis as Record<string, unknown>)[SSE_KEY] = new Map<string, SseEntry>())

/**
* Subscribe to MCP tool-change SSE events for a workspace.
* On each `tools_changed` event, invalidates the relevant React Query caches
* so the UI refreshes automatically.
*/
export function useMcpToolsEvents(workspaceId: string) {
const queryClient = useQueryClient()

useEffect(() => {
if (!workspaceId) return

const invalidate = () => {
queryClient.invalidateQueries({ queryKey: mcpKeys.tools(workspaceId) })
queryClient.invalidateQueries({ queryKey: mcpKeys.servers(workspaceId) })
queryClient.invalidateQueries({ queryKey: mcpKeys.storedTools(workspaceId) })
}

let entry = sseConnections.get(workspaceId)

if (!entry) {
const source = new EventSource(`/api/mcp/events?workspaceId=${workspaceId}`)

source.addEventListener('tools_changed', () => {
invalidate()
})

source.onerror = () => {
logger.warn(`SSE connection error for workspace ${workspaceId}`)
}

entry = { source, refs: 0 }
sseConnections.set(workspaceId, entry)
}

entry.refs++

return () => {
const current = sseConnections.get(workspaceId)
if (!current) return

current.refs--
if (current.refs <= 0) {
current.source.close()
sseConnections.delete(workspaceId)
}
}
}, [workspaceId, queryClient])
}
Loading