From 980d7250f4d6aea3035e1f530084959b49150153 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Sun, 24 May 2026 17:47:26 +0800 Subject: [PATCH 1/2] fix: reinitialize expired streamable sessions --- packages/client/src/client/client.ts | 91 ++++++++++-------- packages/client/src/client/streamableHttp.ts | 19 +++- .../client/test/client/streamableHttp.test.ts | 93 ++++++++++++++++++- 3 files changed, 160 insertions(+), 43 deletions(-) diff --git a/packages/client/src/client/client.ts b/packages/client/src/client/client.ts index 36a98521cd..7266d41215 100644 --- a/packages/client/src/client/client.ts +++ b/packages/client/src/client/client.ts @@ -179,6 +179,10 @@ export type ClientOptions = ProtocolOptions & { listChanged?: ListChangedHandlers; }; +type SessionExpiringTransport = Transport & { + onsessionexpired?: () => void | Promise; +}; + /** * An MCP client on top of a pluggable transport. * @@ -410,6 +414,13 @@ export class Client extends Protocol { */ override async connect(transport: Transport, options?: RequestOptions): Promise { await super.connect(transport); + (transport as SessionExpiringTransport).onsessionexpired = async () => { + this._serverCapabilities = undefined; + this._serverVersion = undefined; + this._negotiatedProtocolVersion = undefined; + await this._initialize(transport, options); + }; + // When transport sessionId is already set this means we are trying to reconnect. // Restore the protocol version negotiated during the original initialize handshake // so HTTP transports include the required mcp-protocol-version header, but skip re-init. @@ -420,50 +431,54 @@ export class Client extends Protocol { return; } try { - const result = await this._requestWithSchema( - { - method: 'initialize', - params: { - protocolVersion: this._supportedProtocolVersions[0] ?? LATEST_PROTOCOL_VERSION, - capabilities: this._capabilities, - clientInfo: this._clientInfo - } - }, - InitializeResultSchema, - options - ); + await this._initialize(transport, options); + } catch (error) { + // Disconnect if initialization fails. + void this.close(); + throw error; + } + } - if (result === undefined) { - throw new Error(`Server sent invalid initialize result: ${result}`); - } + private async _initialize(transport: Transport, options?: RequestOptions): Promise { + const result = await this._requestWithSchema( + { + method: 'initialize', + params: { + protocolVersion: this._supportedProtocolVersions[0] ?? LATEST_PROTOCOL_VERSION, + capabilities: this._capabilities, + clientInfo: this._clientInfo + } + }, + InitializeResultSchema, + options + ); - if (!this._supportedProtocolVersions.includes(result.protocolVersion)) { - throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`); - } + if (result === undefined) { + throw new Error(`Server sent invalid initialize result: ${result}`); + } - this._serverCapabilities = result.capabilities; - this._serverVersion = result.serverInfo; - this._negotiatedProtocolVersion = result.protocolVersion; - // HTTP transports must set the protocol version in each header after initialization. - if (transport.setProtocolVersion) { - transport.setProtocolVersion(result.protocolVersion); - } + if (!this._supportedProtocolVersions.includes(result.protocolVersion)) { + throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`); + } - this._instructions = result.instructions; + this._serverCapabilities = result.capabilities; + this._serverVersion = result.serverInfo; + this._negotiatedProtocolVersion = result.protocolVersion; + // HTTP transports must set the protocol version in each header after initialization. + if (transport.setProtocolVersion) { + transport.setProtocolVersion(result.protocolVersion); + } - await this.notification({ - method: 'notifications/initialized' - }); + this._instructions = result.instructions; - // Set up list changed handlers now that we know server capabilities - if (this._pendingListChangedConfig) { - this._setupListChangedHandlers(this._pendingListChangedConfig); - this._pendingListChangedConfig = undefined; - } - } catch (error) { - // Disconnect if initialization fails. - void this.close(); - throw error; + await this.notification({ + method: 'notifications/initialized' + }); + + // Set up list changed handlers now that we know server capabilities + if (this._pendingListChangedConfig) { + this._setupListChangedHandlers(this._pendingListChangedConfig); + this._pendingListChangedConfig = undefined; } } diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 3b8ddafe5a..fd3b7edd46 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -189,6 +189,7 @@ export class StreamableHTTPClientTransport implements Transport { onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; + onsessionexpired?: () => void | Promise; constructor(url: URL, opts?: StreamableHTTPClientTransportOptions) { this._url = url; @@ -521,13 +522,14 @@ export class StreamableHTTPClientTransport implements Transport { message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } ): Promise { - return this._send(message, options, false); + return this._send(message, options, false, false); } private async _send( message: JSONRPCMessage | JSONRPCMessage[], options: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } | undefined, - isAuthRetry: boolean + isAuthRetry: boolean, + isSessionRetry: boolean ): Promise { try { const { resumptionToken, onresumptiontoken } = options || {}; @@ -579,7 +581,7 @@ export class StreamableHTTPClientTransport implements Transport { }); await response.text?.().catch(() => {}); // Purposely _not_ awaited, so we don't call onerror twice - return this._send(message, options, true); + return this._send(message, options, true, isSessionRetry); } await response.text?.().catch(() => {}); if (isAuthRetry) { @@ -593,6 +595,15 @@ export class StreamableHTTPClientTransport implements Transport { const text = await response.text?.().catch(() => null); + if (response.status === 404 && this._sessionId && !isSessionRetry) { + this._sessionId = undefined; + await this.onsessionexpired?.(); + + if (this._sessionId) { + return this._send(message, options, isAuthRetry, true); + } + } + if (response.status === 403 && this._oauthProvider) { const { resourceMetadataUrl, scope, error } = extractWWWAuthenticateParams(response); @@ -629,7 +640,7 @@ export class StreamableHTTPClientTransport implements Transport { throw new UnauthorizedError(); } - return this._send(message, options, isAuthRetry); + return this._send(message, options, isAuthRetry, isSessionRetry); } } diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index 6542302c9d..00639d0988 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -1,9 +1,10 @@ import type { JSONRPCMessage, JSONRPCRequest } from '@modelcontextprotocol/core'; -import { OAuthError, OAuthErrorCode, SdkErrorCode, SdkHttpError } from '@modelcontextprotocol/core'; +import { LATEST_PROTOCOL_VERSION, OAuthError, OAuthErrorCode, SdkErrorCode, SdkHttpError } from '@modelcontextprotocol/core'; import type { Mock, Mocked } from 'vitest'; import type { OAuthClientProvider } from '../../src/client/auth.js'; import { UnauthorizedError } from '../../src/client/auth.js'; +import { Client } from '../../src/client/client.js'; import type { ReconnectionScheduler, StartSSEOptions, StreamableHTTPReconnectionOptions } from '../../src/client/streamableHttp.js'; import { StreamableHTTPClientTransport } from '../../src/client/streamableHttp.js'; @@ -249,6 +250,96 @@ describe('StreamableHTTPClientTransport', () => { expect(errorSpy).toHaveBeenCalled(); }); + it('reinitializes and retries once when a persisted session expires', async () => { + const client = new Client({ name: 'test-client', version: '1.0.0' }); + const httpTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + let initializeCount = 0; + let firstPing = true; + + (globalThis.fetch as Mock).mockImplementation(async (_url, init) => { + if (init.method === 'GET') { + return { + ok: false, + status: 405, + statusText: 'Method Not Allowed', + headers: new Headers(), + text: async () => '' + }; + } + + const body = JSON.parse(init.body as string) as JSONRPCRequest; + + if (body.method === 'initialize') { + const sessionId = initializeCount++ === 0 ? 'old-session-id' : 'new-session-id'; + + return { + ok: true, + status: 200, + headers: new Headers({ + 'content-type': 'application/json', + 'mcp-session-id': sessionId + }), + json: async () => ({ + jsonrpc: '2.0', + id: body.id, + result: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: {}, + serverInfo: { name: 'test-server', version: '1.0.0' } + } + }) + }; + } + + if (body.method === 'notifications/initialized') { + return { + ok: true, + status: 202, + headers: new Headers(), + text: async () => '' + }; + } + + if (body.method === 'ping' && firstPing) { + firstPing = false; + + return { + ok: false, + status: 404, + statusText: 'Not Found', + headers: new Headers(), + text: async () => 'Session not found' + }; + } + + return { + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'application/json' }), + json: async () => ({ + jsonrpc: '2.0', + id: body.id, + result: {} + }) + }; + }); + + try { + await client.connect(httpTransport); + await expect(client.ping()).resolves.toEqual({}); + + const calls = (globalThis.fetch as Mock).mock.calls; + const postCalls = calls.filter(([, init]) => init.method === 'POST'); + expect(postCalls).toHaveLength(6); + expect(postCalls[2]![1].headers.get('mcp-session-id')).toBe('old-session-id'); + expect(postCalls[3]![1].headers.get('mcp-session-id')).toBeNull(); + expect(postCalls[5]![1].headers.get('mcp-session-id')).toBe('new-session-id'); + expect(httpTransport.sessionId).toBe('new-session-id'); + } finally { + await client.close().catch(() => {}); + } + }); + it('should handle non-streaming JSON response', async () => { const message: JSONRPCMessage = { jsonrpc: '2.0', From 116b048f2b25fcececb2c6b5ab90584b9933533c Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Wed, 27 May 2026 17:29:05 +0800 Subject: [PATCH 2/2] chore: add streamable session changeset --- .changeset/steady-rivers-reinit.md | 5 +++++ test/e2e/requirements.ts | 10 +++------- test/e2e/scenarios/transport-http.test.ts | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) create mode 100644 .changeset/steady-rivers-reinit.md diff --git a/.changeset/steady-rivers-reinit.md b/.changeset/steady-rivers-reinit.md new file mode 100644 index 0000000000..76b3f5fc63 --- /dev/null +++ b/.changeset/steady-rivers-reinit.md @@ -0,0 +1,5 @@ +--- +"@modelcontextprotocol/client": patch +--- + +fix: reinitialize expired streamable sessions diff --git a/test/e2e/requirements.ts b/test/e2e/requirements.ts index ea471a21fc..ee4d618388 100644 --- a/test/e2e/requirements.ts +++ b/test/e2e/requirements.ts @@ -1641,7 +1641,8 @@ export const REQUIREMENTS: Record = { 'client-transport:http:404-surfaces': { source: 'sdk', - behavior: 'A 404 (session expired) on a request surfaces as an error to the caller.', + behavior: + 'A 404 (session expired) on a request surfaces as an error to the caller when the transport has no session recovery hook.', transports: ['streamableHttp'], note: 'Session-id continuity testing requires the per-session host (404 is session-not-found).' }, @@ -1650,12 +1651,7 @@ export const REQUIREMENTS: Record = { behavior: 'A 404 in response to a request carrying a session ID makes the client start a new session with a fresh InitializeRequest and no session ID attached.', transports: ['streamableHttp'], - note: 'This exercises the StreamableHTTP client transport directly; the matrix transport arg is ignored, so it runs as a single streamableHttp-labelled cell to avoid duplicate runs.', - knownFailures: [ - { - note: 'On a 404 for an existing session the transport throws StreamableHTTPError (streamableHttp.ts:551) and never re-initializes — no session recovery is attempted.' - } - ] + note: 'This exercises the StreamableHTTP client transport directly; the matrix transport arg is ignored, so it runs as a single streamableHttp-labelled cell to avoid duplicate runs.' }, 'client-transport:http:accept-header-get': { source: 'https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#listening-for-messages-from-the-server', diff --git a/test/e2e/scenarios/transport-http.test.ts b/test/e2e/scenarios/transport-http.test.ts index b0bd8abbd8..33d6f6d488 100644 --- a/test/e2e/scenarios/transport-http.test.ts +++ b/test/e2e/scenarios/transport-http.test.ts @@ -376,6 +376,7 @@ verifies('client-transport:http:404-surfaces', async (_args: TestArgs) => { await client.connect(transport); sessionIdToBreak = transport.sessionId; + transport.onsessionexpired = undefined; const call = client.ping(); await expect(call).rejects.toThrow();