Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gentle-maps-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/client': patch
---

Clear stale Streamable HTTP client sessions when a session-bound request receives HTTP 404 by clearing the stored session ID, so the next initialize flow can proceed without an MCP session header.
33 changes: 31 additions & 2 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp
maxRetries: 2
};

const SESSION_BOUND_404_ERROR = Symbol('sessionBound404Error');

type SessionBound404Error = Error & { [SESSION_BOUND_404_ERROR]?: true };

function markSessionBound404Error(error: Error): Error {
(error as SessionBound404Error)[SESSION_BOUND_404_ERROR] = true;
return error;
}

function isSessionBound404Error(error: unknown): boolean {
return Boolean(error && typeof error === 'object' && (error as SessionBound404Error)[SESSION_BOUND_404_ERROR] === true);
}

/**
* Options for starting or authenticating an SSE connection
*/
Expand Down Expand Up @@ -237,6 +250,7 @@ export class StreamableHTTPClientTransport implements Transport {
// Try to open an initial SSE stream with GET to listen for server messages
// This is optional according to the spec - server may not support it
const headers = await this._commonHeaders();
const sentSessionId = headers.get('mcp-session-id');
const userAccept = headers.get('accept');
const types = [...(userAccept?.split(',').map(s => s.trim().toLowerCase()) ?? []), 'text/event-stream'];
headers.set('accept', [...new Set(types)].join(', '));
Expand All @@ -254,6 +268,11 @@ export class StreamableHTTPClientTransport implements Transport {
});

if (!response.ok) {
const shouldClearSessionFor404 = response.status === 404 && sentSessionId !== null && this._sessionId === sentSessionId;
if (shouldClearSessionFor404) {
this._sessionId = undefined;
}

if (response.status === 401 && this._authProvider) {
if (response.headers.has('www-authenticate')) {
const { resourceMetadataUrl, scope } = extractWWWAuthenticateParams(response);
Comment thread
Maverick-666 marked this conversation as resolved.
Expand Down Expand Up @@ -288,10 +307,11 @@ export class StreamableHTTPClientTransport implements Transport {
return;
}

throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, {
const error = new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, {
status: response.status,
statusText: response.statusText
});
throw shouldClearSessionFor404 ? markSessionBound404Error(error) : error;
}

this._handleSseStream(response.body, options, true);
Expand Down Expand Up @@ -345,7 +365,11 @@ export class StreamableHTTPClientTransport implements Transport {
this._cancelReconnection = undefined;
if (this._abortController?.signal.aborted) return;
this._startOrAuthSse(options).catch(error => {
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
const reconnectError = error instanceof Error ? error : new Error(String(error));
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError.message}`));
if (isSessionBound404Error(reconnectError)) {
return;
}
try {
this._scheduleReconnection(options, attemptCount + 1);
} catch (scheduleError) {
Expand Down Expand Up @@ -539,6 +563,7 @@ export class StreamableHTTPClientTransport implements Transport {
}

const headers = await this._commonHeaders();
const sentSessionId = headers.get('mcp-session-id');
headers.set('content-type', 'application/json');
const userAccept = headers.get('accept');
const types = [...(userAccept?.split(',').map(s => s.trim().toLowerCase()) ?? []), 'application/json', 'text/event-stream'];
Expand All @@ -561,6 +586,10 @@ export class StreamableHTTPClientTransport implements Transport {
}

if (!response.ok) {
if (response.status === 404 && sentSessionId !== null && this._sessionId === sentSessionId) {
this._sessionId = undefined;
}
Comment thread
Maverick-666 marked this conversation as resolved.

if (response.status === 401 && this._authProvider) {
// Store WWW-Authenticate params for interactive finishAuth() path
if (response.headers.has('www-authenticate')) {
Comment thread
Maverick-666 marked this conversation as resolved.
Expand Down
203 changes: 202 additions & 1 deletion packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ describe('StreamableHTTPClientTransport', () => {
await expect(transport.terminateSession()).resolves.not.toThrow();
});

it('should handle 404 response when session expires', async () => {
it('should preserve existing 404 behavior when request is not session-bound', async () => {
const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'test',
Expand Down Expand Up @@ -248,6 +248,104 @@ describe('StreamableHTTPClientTransport', () => {
expect(errorSpy).toHaveBeenCalled();
});

it('should clear session ID on 404 for session-bound POST requests', async () => {
const initializeMessage: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'initialize',
params: {
clientInfo: { name: 'test-client', version: '1.0' },
protocolVersion: '2025-03-26'
},
id: 'init-id'
};
const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 'test-id'
};

(globalThis.fetch as Mock)
.mockResolvedValueOnce({
ok: true,
status: 202,
headers: new Headers({ 'mcp-session-id': 'stale-session-id' }),
text: () => Promise.resolve('')
})
.mockResolvedValueOnce({
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve('Session not found'),
headers: new Headers()
})
.mockResolvedValueOnce({
ok: true,
status: 202,
headers: new Headers(),
text: () => Promise.resolve('')
});

await transport.send(initializeMessage);
expect(transport.sessionId).toBe('stale-session-id');

await expect(transport.send(message)).rejects.toMatchObject({
code: SdkErrorCode.ClientHttpNotImplemented,
data: expect.objectContaining({
status: 404,
text: 'Session not found'
})
});
expect(transport.sessionId).toBeUndefined();

await transport.send({ jsonrpc: '2.0', method: 'notifications/ping' } as JSONRPCMessage);
const lastCall = (globalThis.fetch as Mock).mock.calls.at(-1)!;
expect(lastCall[1].headers.get('mcp-session-id')).toBeNull();
});

it('should not clear a newer session ID when a stale session-bound POST request returns 404', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
sessionId: 'stale-session-A'
});

const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 'test-id'
};

let resolveFetch!: (value: unknown) => void;
const deferredFetch = new Promise(resolve => {
resolveFetch = resolve;
});

(globalThis.fetch as Mock).mockImplementationOnce(() => {
// Simulate another in-flight request establishing a fresh session while this request is pending.
(transport as unknown as { _sessionId?: string })._sessionId = 'fresh-session-B';
return deferredFetch;
});

const sendPromise = transport.send(message);

resolveFetch({
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve('Session not found'),
headers: new Headers()
});

await expect(sendPromise).rejects.toMatchObject({
code: SdkErrorCode.ClientHttpNotImplemented,
data: expect.objectContaining({
status: 404
})
});

expect(transport.sessionId).toBe('fresh-session-B');
});

it('should handle non-streaming JSON response', async () => {
const message: JSONRPCMessage = {
jsonrpc: '2.0',
Expand Down Expand Up @@ -309,6 +407,75 @@ describe('StreamableHTTPClientTransport', () => {
expect(globalThis.fetch).toHaveBeenCalledTimes(2);
});

it('should clear session ID when GET SSE stream returns 404 for a session-bound request', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
sessionId: 'stale-session-id'
});
await transport.start();

(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve('Session not found'),
headers: new Headers()
});

await expect(
(transport as unknown as { _startOrAuthSse: (opts: StartSSEOptions) => Promise<void> })._startOrAuthSse({})
).rejects.toMatchObject({
code: SdkErrorCode.ClientHttpFailedToOpenStream,
data: expect.objectContaining({
status: 404,
statusText: 'Not Found'
})
});

expect(transport.sessionId).toBeUndefined();

const getCall = (globalThis.fetch as Mock).mock.calls[0]!;
expect(getCall[1].method).toBe('GET');
expect(getCall[1].headers.get('mcp-session-id')).toBe('stale-session-id');
});

it('should not clear a newer session ID when a stale session-bound GET request returns 404', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
sessionId: 'stale-session-A'
});
await transport.start();

let resolveFetch!: (value: unknown) => void;
const deferredFetch = new Promise(resolve => {
resolveFetch = resolve;
});

(globalThis.fetch as Mock).mockImplementationOnce(() => {
// Simulate another in-flight request establishing a fresh session while this request is pending.
(transport as unknown as { _sessionId?: string })._sessionId = 'fresh-session-B';
return deferredFetch;
});

const startPromise = (transport as unknown as { _startOrAuthSse: (opts: StartSSEOptions) => Promise<void> })._startOrAuthSse({});

resolveFetch({
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve('Session not found'),
headers: new Headers()
});

await expect(startPromise).rejects.toMatchObject({
code: SdkErrorCode.ClientHttpFailedToOpenStream,
data: expect.objectContaining({
status: 404,
statusText: 'Not Found'
})
});

expect(transport.sessionId).toBe('fresh-session-B');
});

it('should handle successful initial GET connection for SSE', async () => {
// Set up readable stream for SSE events
const encoder = new TextEncoder();
Expand Down Expand Up @@ -936,6 +1103,40 @@ describe('StreamableHTTPClientTransport', () => {
expect(fetchMock.mock.calls[1]![1]?.method).toBe('GET');
});

it('should stop retrying GET reconnection after a session-bound 404 clears the stale session', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
sessionId: 'stale-session-id',
reconnectionOptions: {
initialReconnectionDelay: 10,
maxRetries: 3,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});
await transport.start();

const fetchMock = globalThis.fetch as Mock;
fetchMock.mockResolvedValue({
ok: false,
status: 404,
statusText: 'Not Found',
headers: new Headers(),
text: () => Promise.resolve('Session not found')
});

(
transport as unknown as {
_scheduleReconnection: (opts: StartSSEOptions, attemptCount?: number) => void;
}
)._scheduleReconnection({}, 0);

await vi.advanceTimersByTimeAsync(20);
await vi.advanceTimersByTimeAsync(100);

expect(fetchMock).toHaveBeenCalledTimes(1);
expect(transport.sessionId).toBeUndefined();
});

it('should NOT reconnect a POST-initiated stream that fails', async () => {
// ARRANGE
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
Expand Down
Loading