Skip to content

Commit a1de5be

Browse files
authored
feat: make MCP Server stateful for multiple serverless container (#37)
1 parent 48fd45b commit a1de5be

File tree

8 files changed

+371
-74
lines changed

8 files changed

+371
-74
lines changed

.vscode/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
},
4141
"cSpell.language": "en-US",
4242
"cSpell.words": [
43+
"AWSS",
4344
"buildkitd",
4445
"buildx",
4546
"CERTDIR",
@@ -55,6 +56,7 @@
5556
"modelcontextprotocol",
5657
"oninitialized",
5758
"PKCE",
59+
"resumability",
5860
"rspack",
5961
"Streamable",
6062
"tsparser",

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ A playground for Model Context Protocol (MCP) server built with TypeScript and S
77
- MCP Server implementation: HTTP-Based Streamable transport using `@modelcontextprotocol/sdk` with HTTP transport, session management, and tool execution.
88
- OAuth authentication/3rd party authorization: Implements an OAuth server for MCP clients to process 3rd party authorization servers like Auth0, providing Dynamic Application Registration for MCP server.
99
- Storage: Provide storage for MCP server to store data like OAuth sessions, tokens, etc.
10-
- Session Management: MCP server can manage multiple sessions stateless.
10+
- Session Management: Support stateful sessions by using replay of initial request.
1111
- Tools: `echo`, `system-time`, `streaming`, `project` for demonstration.
1212
- Prompts: `echo`
1313

@@ -133,6 +133,20 @@ helm install mcp-server-playground chrisleekr/mcp-server-playground
133133
- JSON Web Token (JWT) Signature Algorithm: RS256
134134
- Click on "Create"
135135

136+
## How to make stateful session with multiple MCP Server instances?
137+
138+
When the MCP server is deployed as a cluster, it is not possible to make it stateful with multiple MCP Server instances because the transport is not shared between instances by design.
139+
140+
To make it truly stateful, I used Valkey to store the session id with the initial request.
141+
142+
When the request comes in to alternative MCP server instance, it will check if the session id is in the Valkey. If it is, it will replay the initial request and connect the transport to the server.
143+
144+
Inspired from [https://github.com/modelcontextprotocol/modelcontextprotocol/discussions/102](https://github.com/modelcontextprotocol/modelcontextprotocol/discussions/102)
145+
146+
The below diagram shows the flow of the stateful session management.
147+
148+
<img width="681" height="882" alt="Image" src="https://github.com/user-attachments/assets/7f56339e-2665-47cb-a882-69d3c7096b47" />
149+
136150
## TODO
137151

138152
- [ ] Streaming is not working as expected. It returns the final result instead of streaming the data.

src/core/server/http/handlers/mcpDelete.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ export function setupMCPDeleteHandler(
1919
if (
2020
sessionId === undefined ||
2121
sessionId.trim() === '' ||
22-
!(await transportManager.hasTransport(sessionId))
22+
!(await transportManager.hasSession(sessionId))
2323
) {
2424
loggingContext.log('error', 'Session not found');
2525
res.status(200).json({ error: 'Session not found' }); // Return 200 to gracefully handle the request
2626
return;
2727
}
2828

2929
loggingContext.log('debug', 'Found session, getting transport');
30-
const transport = await transportManager.getTransport(sessionId);
30+
const transport = transportManager.getTransport(sessionId);
3131
if (!transport) {
3232
loggingContext.log('error', 'Transport not found');
3333
res.status(200).json({ error: 'Transport not found' }); // Return 200 to gracefully handle the request

src/core/server/http/handlers/mcpPost.ts

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,37 +37,64 @@ export function setupMCPPostHandler(
3737
if (
3838
sessionId !== undefined &&
3939
sessionId.trim() !== '' &&
40-
(await transportManager.hasTransport(sessionId))
40+
(await transportManager.hasSession(sessionId))
4141
) {
42-
loggingContext.log('debug', 'Found session, getting transport');
43-
// Reuse existing transport
44-
const existingTransport =
45-
await transportManager.getTransport(sessionId);
46-
if (!existingTransport) {
42+
loggingContext.log('debug', 'Session exists, checking transport', {
43+
data: { sessionId },
44+
});
45+
// If transport exists in this server, then use the existing transport.
46+
// If transport does not exist in this server, then reply initial request and create a new transport based on the sessionId.
47+
if (transportManager.hasTransport(sessionId)) {
48+
loggingContext.log(
49+
'debug',
50+
'Found transport in this server, using existing transport'
51+
);
52+
// Reuse existing transport
53+
const existingTransport = transportManager.getTransport(sessionId);
54+
if (!existingTransport) {
55+
loggingContext.log(
56+
'error',
57+
'Transport not found despite has() check'
58+
);
59+
throw new Error('Transport not found despite has() check');
60+
}
61+
transport = existingTransport;
62+
} else {
4763
loggingContext.log(
48-
'error',
49-
'Transport not found despite has() check'
64+
'debug',
65+
'No transport found in this server, replay initial request'
5066
);
51-
throw new Error('Transport not found despite has() check');
67+
// Setup a new transport for the session
68+
transport = await transportManager.replayInitialRequest(sessionId);
5269
}
53-
loggingContext.log(
54-
'debug',
55-
'Transport found, using existing transport'
56-
);
57-
transport = existingTransport;
5870
} else if (
59-
sessionId === undefined &&
71+
(sessionId === undefined || sessionId.trim() === '') &&
6072
isInitializeRequest(requestBody)
6173
) {
6274
loggingContext.log('debug', 'No session found, creating new session');
6375
// New initialization request
6476
const newSessionId = randomUUID();
6577

66-
transport = await transportManager.createTransport(newSessionId);
78+
transport = transportManager.createTransport(newSessionId);
79+
80+
// Save the initial request to the session
81+
await transportManager.saveSession(newSessionId, {
82+
initialRequest: requestBody,
83+
});
84+
85+
// Connect the transport to the server
86+
loggingContext.log('debug', 'Connecting transport to server');
87+
await transportManager.getServer().connect(transport);
6788
} else {
6889
loggingContext.log(
6990
'error',
70-
'Invalid request: missing session ID or not an initialization request'
91+
'Invalid request: missing session ID or not an initialization request',
92+
{
93+
data: {
94+
sessionId,
95+
requestBody,
96+
},
97+
}
7198
);
7299
res.status(400).json({
73100
error:
@@ -76,6 +103,12 @@ export function setupMCPPostHandler(
76103
return;
77104
}
78105

106+
loggingContext.log('debug', 'Handling request', {
107+
data: {
108+
sessionId,
109+
requestBody,
110+
},
111+
});
79112
await transport.handleRequest(req, res, requestBody);
80113
loggingContext.log('debug', 'Request handled');
81114
return;

src/core/server/transport/manager.ts

Lines changed: 105 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
22
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
3+
import { InitializeRequest } from '@modelcontextprotocol/sdk/types.js';
4+
import type express from 'express';
35

46
import { config } from '@/config/manager';
57
import { createStorage } from '@/core/storage/storageFactory';
@@ -8,10 +10,16 @@ import { Storage } from '@/core/storage/types';
810
import { loggingContext } from '../http/context';
911

1012
export class TransportManager {
13+
// MCP server instance.
1114
private server: Server;
15+
16+
// Storage for session data to keep track of the initial request.
1217
private storage: Storage;
18+
19+
// Map of sessionId to transport in this server memory.
1320
private transports: Map<string, StreamableHTTPServerTransport> = new Map();
1421

22+
// Prefix for the session data cache key.
1523
private readonly CACHE_KEY_PREFIX = 'mcp-session';
1624

1725
constructor(server: Server) {
@@ -28,96 +36,145 @@ export class TransportManager {
2836
}
2937
}
3038

31-
public async getTransport(
32-
sessionId: string
33-
): Promise<StreamableHTTPServerTransport | undefined> {
34-
loggingContext.log('debug', 'Getting transport', {
39+
public getServer(): Server {
40+
return this.server;
41+
}
42+
43+
public async hasSession(sessionId: string): Promise<boolean> {
44+
loggingContext.log('debug', 'Checking if session exists', {
3545
data: { sessionId },
3646
});
37-
38-
// If storage contains sessionId, then check transports with sessionId. If not, then create a new transport with sessionId.
3947
const session = await this.storage.get(
4048
`${this.CACHE_KEY_PREFIX}:${sessionId}`
4149
);
42-
if (session !== null && session.trim() !== '') {
43-
if (this.transports.has(sessionId)) {
44-
loggingContext.log('debug', 'Transport found in transports', {
45-
data: { sessionId },
46-
});
47-
return this.transports.get(sessionId);
48-
}
49-
50-
// It exists in storage, but not in transports. Create a new transport with sessionId.
51-
loggingContext.log(
52-
'debug',
53-
'Transport not found in transports, creating new transport'
54-
);
55-
const newTransport = await this.createTransport(sessionId);
56-
this.transports.set(sessionId, newTransport);
50+
return session !== null && session.trim() !== '';
51+
}
5752

58-
return newTransport;
53+
public async saveSession(
54+
sessionId: string,
55+
sessionData: {
56+
initialRequest: InitializeRequest;
5957
}
58+
): Promise<void> {
59+
// TODO: Make this to be expired after a certain time.
60+
await this.storage.set(
61+
`${this.CACHE_KEY_PREFIX}:${sessionId}`,
62+
JSON.stringify(sessionData)
63+
);
64+
}
6065

61-
// If session is not found, then return undefined.
62-
return undefined;
66+
public hasTransport(sessionId: string): boolean {
67+
return this.transports.has(sessionId);
6368
}
6469

65-
public async hasTransport(sessionId: string): Promise<boolean> {
66-
loggingContext.log('debug', 'Checking if transport exists', {
67-
data: { sessionId },
68-
});
70+
public getTransport(
71+
sessionId: string
72+
): StreamableHTTPServerTransport | undefined {
73+
return this.transports.get(sessionId);
74+
}
75+
76+
public async replayInitialRequest(
77+
sessionId: string
78+
): Promise<StreamableHTTPServerTransport> {
6979
const session = await this.storage.get(
7080
`${this.CACHE_KEY_PREFIX}:${sessionId}`
7181
);
72-
return session !== null && session.trim() !== '';
82+
if (session === null || session.trim() === '') {
83+
throw new Error('Session not found');
84+
}
85+
const sessionData = JSON.parse(session) as {
86+
initialRequest: InitializeRequest;
87+
};
88+
loggingContext.log('debug', 'Replaying initial request', {
89+
data: { sessionData },
90+
});
91+
92+
const transport = this.createTransport(sessionId);
93+
94+
// Replay initial request with dummy request and response.
95+
// This is to simulate the initial request and response.
96+
await transport.handleRequest(
97+
{
98+
method: 'POST',
99+
url: '/mcp',
100+
headers: {
101+
accept: ['application/json', 'text/event-stream'],
102+
'content-type': ['application/json', 'text/event-stream'],
103+
},
104+
body: JSON.stringify(sessionData.initialRequest),
105+
} as unknown as express.Request,
106+
{
107+
on: () => {},
108+
writeHead: () => {
109+
return {
110+
end: () => {},
111+
} as unknown as express.Response;
112+
},
113+
} as unknown as express.Response,
114+
sessionData.initialRequest
115+
);
116+
117+
loggingContext.log(
118+
'debug',
119+
'Initial request replayed, connecting transport',
120+
{
121+
data: {
122+
sessionId,
123+
transportCount: this.transports.size,
124+
},
125+
}
126+
);
127+
128+
await this.server.connect(transport);
129+
130+
return transport;
73131
}
74132

75-
public async createTransport(
76-
sessionId: string
77-
): Promise<StreamableHTTPServerTransport> {
133+
public createTransport(sessionId: string): StreamableHTTPServerTransport {
78134
const transport = new StreamableHTTPServerTransport({
79135
/**
80136
* Function that generates a session ID for the transport.
81137
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
82138
*
83139
* Return undefined to disable session management.
84140
*/
85-
// This is disabled to make stateless mode.
86-
sessionIdGenerator: undefined,
87-
// Below is for stateful mode.
88-
// sessionIdGenerator: (): string => sessionId,
141+
sessionIdGenerator: (): string => sessionId,
142+
89143
/**
90144
* If true, the server will return JSON responses instead of starting an SSE stream.
91145
* This can be useful for simple request/response scenarios without streaming.
92146
* Default is false (SSE streams are preferred).
93147
*/
94148
enableJsonResponse: false,
149+
/**
150+
* TODO: Make custom event store for persistent storage.
151+
* Event store for resumability support
152+
* If provided, resumability will be enabled, allowing clients to reconnect and resume messages
153+
*/
154+
// eventStore?: EventStore;
95155
});
96156

97-
// Manually set the session ID to ensure it's available
98-
transport.sessionId = sessionId;
99157
loggingContext.log('debug', 'Creating transport', {
100158
data: { sessionId },
101159
});
102160

103161
this.transports.set(sessionId, transport);
104-
await this.storage.set(
105-
`${this.CACHE_KEY_PREFIX}:${sessionId}`,
106-
JSON.stringify({
107-
createdAt: new Date().toISOString(),
108-
})
109-
);
110162

111-
loggingContext.log('debug', 'Transport created');
163+
loggingContext.log('debug', 'Transport created', {
164+
data: {
165+
sessionId: transport.sessionId,
166+
},
167+
});
112168

113169
// Set up cleanup handler
114170
transport.onclose = (): void => {
115171
const currentSessionId = transport.sessionId;
116172
if (currentSessionId !== undefined && currentSessionId.trim() !== '') {
117173
this.transports.delete(currentSessionId);
118-
void this.storage.delete(
119-
`${this.CACHE_KEY_PREFIX}:${currentSessionId}`
120-
);
174+
// Shouldn't delete the session data from storage to avoid sunset server deleting the session data.
175+
// void this.storage.delete(
176+
// `${this.CACHE_KEY_PREFIX}:${currentSessionId}`
177+
// );
121178
loggingContext.log('debug', 'Transport closed and cleaned up', {
122179
data: {
123180
transportCount: this.transports.size,
@@ -126,12 +183,6 @@ export class TransportManager {
126183
}
127184
};
128185

129-
// Connect the transport to the server
130-
loggingContext.log('debug', 'Connecting transport to server');
131-
await this.server.connect(
132-
transport as StreamableHTTPServerTransport & { sessionId: string }
133-
);
134-
135186
return transport;
136187
}
137188

0 commit comments

Comments
 (0)