Skip to content

Commit 8b4b3af

Browse files
authored
fix(mcp): harden notification system against race conditions (#3168)
* fix(mcp): harden notification system against race conditions - Guard concurrent connect() calls in connection manager with connectingServers Set - Suppress post-disconnect notification handler firing in MCP client - Clean up Redis event listeners in pub/sub dispose() - Add tests for all three hardening fixes (11 new tests) * updated tests * plugged in new mcp event based system and create sse route to publish notifs * ack commetns * fix reconnect timer * cleanup when running onClose * fixed spacing on mcp settings tab * keep error listeners before quiet in redis
1 parent 190f12f commit 8b4b3af

File tree

17 files changed

+1421
-23
lines changed

17 files changed

+1421
-23
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Tests for MCP SSE events endpoint
3+
*
4+
* @vitest-environment node
5+
*/
6+
import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing'
7+
import { beforeEach, describe, expect, it, vi } from 'vitest'
8+
9+
mockConsoleLogger()
10+
const auth = mockAuth()
11+
12+
const mockGetUserEntityPermissions = vi.fn()
13+
vi.doMock('@/lib/workspaces/permissions/utils', () => ({
14+
getUserEntityPermissions: mockGetUserEntityPermissions,
15+
}))
16+
17+
vi.doMock('@/lib/mcp/connection-manager', () => ({
18+
mcpConnectionManager: null,
19+
}))
20+
21+
vi.doMock('@/lib/mcp/pubsub', () => ({
22+
mcpPubSub: null,
23+
}))
24+
25+
const { GET } = await import('./route')
26+
27+
describe('MCP Events SSE Endpoint', () => {
28+
beforeEach(() => {
29+
vi.clearAllMocks()
30+
})
31+
32+
it('returns 401 when session is missing', async () => {
33+
auth.setUnauthenticated()
34+
35+
const request = createMockRequest(
36+
'GET',
37+
undefined,
38+
{},
39+
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
40+
)
41+
42+
const response = await GET(request as any)
43+
44+
expect(response.status).toBe(401)
45+
const text = await response.text()
46+
expect(text).toBe('Unauthorized')
47+
})
48+
49+
it('returns 400 when workspaceId is missing', async () => {
50+
auth.setAuthenticated()
51+
52+
const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events')
53+
54+
const response = await GET(request as any)
55+
56+
expect(response.status).toBe(400)
57+
const text = await response.text()
58+
expect(text).toBe('Missing workspaceId query parameter')
59+
})
60+
61+
it('returns 403 when user lacks workspace access', async () => {
62+
auth.setAuthenticated()
63+
mockGetUserEntityPermissions.mockResolvedValue(null)
64+
65+
const request = createMockRequest(
66+
'GET',
67+
undefined,
68+
{},
69+
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
70+
)
71+
72+
const response = await GET(request as any)
73+
74+
expect(response.status).toBe(403)
75+
const text = await response.text()
76+
expect(text).toBe('Access denied to workspace')
77+
expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123')
78+
})
79+
80+
it('returns SSE stream when authorized', async () => {
81+
auth.setAuthenticated()
82+
mockGetUserEntityPermissions.mockResolvedValue({ read: true })
83+
84+
const request = createMockRequest(
85+
'GET',
86+
undefined,
87+
{},
88+
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
89+
)
90+
91+
const response = await GET(request as any)
92+
93+
expect(response.status).toBe(200)
94+
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
95+
expect(response.headers.get('Cache-Control')).toBe('no-cache')
96+
expect(response.headers.get('Connection')).toBe('keep-alive')
97+
})
98+
})
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* SSE endpoint for MCP tool-change events.
3+
*
4+
* Pushes `tools_changed` events to the browser when:
5+
* - An external MCP server sends `notifications/tools/list_changed` (via connection manager)
6+
* - A workflow CRUD route modifies workflow MCP server tools (via pub/sub)
7+
*
8+
* Auth is handled via session cookies (EventSource sends cookies automatically).
9+
*/
10+
11+
import { createLogger } from '@sim/logger'
12+
import type { NextRequest } from 'next/server'
13+
import { getSession } from '@/lib/auth'
14+
import { SSE_HEADERS } from '@/lib/core/utils/sse'
15+
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
16+
import { mcpPubSub } from '@/lib/mcp/pubsub'
17+
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
18+
19+
const logger = createLogger('McpEventsSSE')
20+
21+
export const dynamic = 'force-dynamic'
22+
23+
const HEARTBEAT_INTERVAL_MS = 30_000
24+
25+
export async function GET(request: NextRequest) {
26+
const session = await getSession()
27+
if (!session?.user?.id) {
28+
return new Response('Unauthorized', { status: 401 })
29+
}
30+
31+
const { searchParams } = new URL(request.url)
32+
const workspaceId = searchParams.get('workspaceId')
33+
if (!workspaceId) {
34+
return new Response('Missing workspaceId query parameter', { status: 400 })
35+
}
36+
37+
const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
38+
if (!permissions) {
39+
return new Response('Access denied to workspace', { status: 403 })
40+
}
41+
42+
const encoder = new TextEncoder()
43+
const unsubscribers: Array<() => void> = []
44+
45+
const stream = new ReadableStream({
46+
start(controller) {
47+
const send = (eventName: string, data: Record<string, unknown>) => {
48+
try {
49+
controller.enqueue(
50+
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
51+
)
52+
} catch {
53+
// Stream already closed
54+
}
55+
}
56+
57+
// Subscribe to external MCP server tool changes
58+
if (mcpConnectionManager) {
59+
const unsub = mcpConnectionManager.subscribe((event) => {
60+
if (event.workspaceId !== workspaceId) return
61+
send('tools_changed', {
62+
source: 'external',
63+
serverId: event.serverId,
64+
timestamp: event.timestamp,
65+
})
66+
})
67+
unsubscribers.push(unsub)
68+
}
69+
70+
// Subscribe to workflow CRUD tool changes
71+
if (mcpPubSub) {
72+
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
73+
if (event.workspaceId !== workspaceId) return
74+
send('tools_changed', {
75+
source: 'workflow',
76+
serverId: event.serverId,
77+
timestamp: Date.now(),
78+
})
79+
})
80+
unsubscribers.push(unsub)
81+
}
82+
83+
// Heartbeat to keep the connection alive
84+
const heartbeat = setInterval(() => {
85+
try {
86+
controller.enqueue(encoder.encode(': heartbeat\n\n'))
87+
} catch {
88+
clearInterval(heartbeat)
89+
}
90+
}, HEARTBEAT_INTERVAL_MS)
91+
unsubscribers.push(() => clearInterval(heartbeat))
92+
93+
// Cleanup when client disconnects
94+
request.signal.addEventListener('abort', () => {
95+
for (const unsub of unsubscribers) {
96+
unsub()
97+
}
98+
try {
99+
controller.close()
100+
} catch {
101+
// Already closed
102+
}
103+
logger.info(`SSE connection closed for workspace ${workspaceId}`)
104+
})
105+
106+
logger.info(`SSE connection opened for workspace ${workspaceId}`)
107+
},
108+
})
109+
110+
return new Response(stream, { headers: SSE_HEADERS })
111+
}

apps/sim/app/api/mcp/workflow-servers/[id]/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { and, eq } from 'drizzle-orm'
55
import type { NextRequest } from 'next/server'
66
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
7+
import { mcpPubSub } from '@/lib/mcp/pubsub'
78
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
89

910
const logger = createLogger('WorkflowMcpServerAPI')
@@ -146,6 +147,8 @@ export const DELETE = withMcpAuth<RouteParams>('admin')(
146147

147148
logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`)
148149

150+
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
151+
149152
return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` })
150153
} catch (error) {
151154
logger.error(`[${requestId}] Error deleting workflow MCP server:`, error)

apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { and, eq } from 'drizzle-orm'
55
import type { NextRequest } from 'next/server'
66
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
7+
import { mcpPubSub } from '@/lib/mcp/pubsub'
78
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
89
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
910

@@ -115,6 +116,8 @@ export const PATCH = withMcpAuth<RouteParams>('write')(
115116

116117
logger.info(`[${requestId}] Successfully updated tool ${toolId}`)
117118

119+
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
120+
118121
return createMcpSuccessResponse({ tool: updatedTool })
119122
} catch (error) {
120123
logger.error(`[${requestId}] Error updating tool:`, error)
@@ -160,6 +163,8 @@ export const DELETE = withMcpAuth<RouteParams>('write')(
160163

161164
logger.info(`[${requestId}] Successfully deleted tool ${toolId}`)
162165

166+
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
167+
163168
return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` })
164169
} catch (error) {
165170
logger.error(`[${requestId}] Error deleting tool:`, error)

apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { and, eq } from 'drizzle-orm'
55
import type { NextRequest } from 'next/server'
66
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
7+
import { mcpPubSub } from '@/lib/mcp/pubsub'
78
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
89
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
910
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
@@ -188,6 +189,8 @@ export const POST = withMcpAuth<RouteParams>('write')(
188189
`[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}`
189190
)
190191

192+
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
193+
191194
return createMcpSuccessResponse({ tool }, 201)
192195
} catch (error) {
193196
logger.error(`[${requestId}] Error adding tool:`, error)

apps/sim/app/api/mcp/workflow-servers/route.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { eq, inArray, sql } from 'drizzle-orm'
55
import type { NextRequest } from 'next/server'
66
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
7+
import { mcpPubSub } from '@/lib/mcp/pubsub'
78
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
89
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
910
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
@@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')(
174175
`[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`,
175176
addedTools.map((t) => t.toolName)
176177
)
178+
179+
if (addedTools.length > 0) {
180+
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
181+
}
177182
}
178183

179184
logger.info(

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/tool-input.tsx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,12 @@ import {
6262
type CustomTool as CustomToolDefinition,
6363
useCustomTools,
6464
} from '@/hooks/queries/custom-tools'
65-
import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp'
65+
import {
66+
useForceRefreshMcpTools,
67+
useMcpServers,
68+
useMcpToolsEvents,
69+
useStoredMcpTools,
70+
} from '@/hooks/queries/mcp'
6671
import {
6772
useChildDeploymentStatus,
6873
useDeployChildWorkflow,
@@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({
10351040
const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId)
10361041
const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId)
10371042
const forceRefreshMcpTools = useForceRefreshMcpTools()
1043+
useMcpToolsEvents(workspaceId)
10381044
const openSettingsModal = useSettingsModalStore((state) => state.openModal)
10391045
const mcpDataLoading = mcpLoading || mcpServersLoading
10401046

apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/mcp/mcp.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -894,14 +894,14 @@ export function MCP({ initialServerId }: MCPProps) {
894894
disabled={!hasParams}
895895
>
896896
<div className='flex-1'>
897-
<div className='flex items-center gap-[8px]'>
898-
<p className='font-medium text-[13px] text-[var(--text-primary)]'>
897+
<div className='flex h-[16px] items-center gap-[6px]'>
898+
<p className='font-medium text-[13px] text-[var(--text-primary)] leading-none'>
899899
{tool.name}
900900
</p>
901901
{issues.length > 0 && (
902902
<Tooltip.Root>
903903
<Tooltip.Trigger asChild>
904-
<div>
904+
<div className='flex items-center'>
905905
<Badge
906906
variant={getIssueBadgeVariant(issues[0].issue)}
907907
size='sm'

0 commit comments

Comments
 (0)