Skip to content

Commit 48fd45b

Browse files
authored
feat: add stateless transport manager with cache (#36)
1 parent 48e9fa8 commit 48fd45b

File tree

10 files changed

+127
-61
lines changed

10 files changed

+127
-61
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"CERTDIR",
4646
"chrislee",
4747
"chrisleekr",
48+
"dbsize",
4849
"dind",
4950
"dpop",
5051
"interruptible",

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ A playground for Model Context Protocol (MCP) server built with TypeScript and S
77
- MCP Server implementation: HTTP-Based Streamable transport using `@modelcontextprotocol/sdk` with HTTP transport, session management, and tool execution.
88
- OAuth authentication/3rd party authorization: Implements an OAuth server for MCP clients to process 3rd party authorization servers like Auth0, providing Dynamic Application Registration for MCP server.
99
- Storage: Provide storage for MCP server to store data like OAuth sessions, tokens, etc.
10+
- Session Management: MCP server can manage multiple sessions stateless.
1011
- Tools: `echo`, `system-time`, `streaming`, `project` for demonstration.
1112
- Prompts: `echo`
1213

src/core/server/http/handlers.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
21
import express from 'express';
32

43
import { TransportManager } from '../transport';
@@ -9,7 +8,6 @@ import { setupPingHandler } from './handlers/ping';
98

109
export function setupRequestHandlers(
1110
app: express.Application,
12-
server: Server,
1311
transportManager: TransportManager
1412
): void {
1513
app.get('/', (_req, res) => {
@@ -18,8 +16,8 @@ export function setupRequestHandlers(
1816
});
1917

2018
setupPingHandler(app);
21-
setupMCPPostHandler(app, server, transportManager);
22-
setupMCPDeleteHandler(app, server, transportManager);
19+
setupMCPPostHandler(app, transportManager);
20+
setupMCPDeleteHandler(app, transportManager);
2321

2422
loggingContext.log('info', 'Request handlers setup complete');
2523
}

src/core/server/http/handlers/mcpDelete.ts

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
21
import express from 'express';
32

43
import type { TransportManager } from '@/core/server/transport';
@@ -7,7 +6,6 @@ import { loggingContext } from '../context';
76

87
export function setupMCPDeleteHandler(
98
app: express.Application,
10-
_server: Server,
119
transportManager: TransportManager
1210
): void {
1311
// Handle MCP DELETE requests (session termination)
@@ -21,31 +19,26 @@ export function setupMCPDeleteHandler(
2119
if (
2220
sessionId === undefined ||
2321
sessionId.trim() === '' ||
24-
!transportManager.hasTransport(sessionId)
22+
!(await transportManager.hasTransport(sessionId))
2523
) {
26-
loggingContext.log('error', 'Session not found', {
27-
data: { sessionId },
28-
});
24+
loggingContext.log('error', 'Session not found');
2925
res.status(200).json({ error: 'Session not found' }); // Return 200 to gracefully handle the request
3026
return;
3127
}
3228

33-
const transport = transportManager.getTransport(sessionId);
29+
loggingContext.log('debug', 'Found session, getting transport');
30+
const transport = await transportManager.getTransport(sessionId);
3431
if (!transport) {
35-
loggingContext.log('error', 'Transport not found', {
36-
data: { sessionId },
37-
});
32+
loggingContext.log('error', 'Transport not found');
3833
res.status(200).json({ error: 'Transport not found' }); // Return 200 to gracefully handle the request
3934
return;
4035
}
4136
await transport.handleRequest(req, res);
4237

4338
// Clean up the transport
44-
transportManager.deleteTransport(sessionId);
39+
await transportManager.deleteTransport(sessionId);
4540

46-
loggingContext.log('info', 'Session terminated', {
47-
data: { sessionId },
48-
});
41+
loggingContext.log('info', 'Session terminated');
4942
} catch (error: unknown) {
5043
loggingContext.log('error', 'Error handling DELETE request', {
5144
data: {

src/core/server/http/handlers/mcpPost.ts

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
1+
import { randomUUID } from 'node:crypto';
2+
23
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
34
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
45
import express from 'express';
@@ -18,7 +19,6 @@ function getSessionId(req: express.Request): string | undefined {
1819

1920
export function setupMCPPostHandler(
2021
app: express.Application,
21-
server: Server,
2222
transportManager: TransportManager
2323
): void {
2424
// Handle MCP POST requests (Streamable HTTP)
@@ -29,47 +29,45 @@ export function setupMCPPostHandler(
2929
const sessionId = getSessionId(req);
3030

3131
loggingContext.log('debug', 'POST /mcp request body', {
32-
data: { requestBody, sessionId },
32+
data: { requestBody },
3333
});
3434

3535
let transport: StreamableHTTPServerTransport;
3636

3737
if (
3838
sessionId !== undefined &&
3939
sessionId.trim() !== '' &&
40-
transportManager.hasTransport(sessionId)
40+
(await transportManager.hasTransport(sessionId))
4141
) {
42+
loggingContext.log('debug', 'Found session, getting transport');
4243
// Reuse existing transport
43-
const existingTransport = transportManager.getTransport(sessionId);
44+
const existingTransport =
45+
await transportManager.getTransport(sessionId);
4446
if (!existingTransport) {
4547
loggingContext.log(
4648
'error',
47-
'Transport not found despite has() check',
48-
{
49-
data: { sessionId },
50-
}
49+
'Transport not found despite has() check'
5150
);
5251
throw new Error('Transport not found despite has() check');
5352
}
53+
loggingContext.log(
54+
'debug',
55+
'Transport found, using existing transport'
56+
);
5457
transport = existingTransport;
5558
} else if (
5659
sessionId === undefined &&
5760
isInitializeRequest(requestBody)
5861
) {
62+
loggingContext.log('debug', 'No session found, creating new session');
5963
// New initialization request
60-
transport = transportManager.createTransport();
64+
const newSessionId = randomUUID();
6165

62-
// Connect the transport to the server
63-
await server.connect(
64-
transport as StreamableHTTPServerTransport & { sessionId: string }
65-
);
66+
transport = await transportManager.createTransport(newSessionId);
6667
} else {
6768
loggingContext.log(
6869
'error',
69-
'Invalid request: missing session ID or not an initialization request',
70-
{
71-
data: { sessionId },
72-
}
70+
'Invalid request: missing session ID or not an initialization request'
7371
);
7472
res.status(400).json({
7573
error:
@@ -79,6 +77,7 @@ export function setupMCPPostHandler(
7977
}
8078

8179
await transport.handleRequest(req, res, requestBody);
80+
loggingContext.log('debug', 'Request handled');
8281
return;
8382
} catch (error) {
8483
loggingContext.log('error', 'Error handling HTTP request', {

src/core/server/http/server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import { setupMiddleware } from './middleware';
88

99
export function setupHttpServer(server: Server): express.Application {
1010
const app = express();
11-
const transportManager = new TransportManager();
11+
const transportManager = new TransportManager(server);
1212

1313
// Setup middleware
1414
setupMiddleware(app);
1515

1616
// Setup request handlers
17-
setupRequestHandlers(app, server, transportManager);
17+
setupRequestHandlers(app, transportManager);
1818

1919
// Setup auth handlers
2020
setupAuthHandlers(app);
Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,91 @@
1-
import { randomUUID } from 'node:crypto';
2-
1+
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
32
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
43

4+
import { config } from '@/config/manager';
5+
import { createStorage } from '@/core/storage/storageFactory';
6+
import { Storage } from '@/core/storage/types';
7+
58
import { loggingContext } from '../http/context';
69

710
export class TransportManager {
11+
private server: Server;
12+
private storage: Storage;
813
private transports: Map<string, StreamableHTTPServerTransport> = new Map();
914

10-
public getTransport(
15+
private readonly CACHE_KEY_PREFIX = 'mcp-session';
16+
17+
constructor(server: Server) {
18+
this.server = server;
19+
20+
try {
21+
this.storage = createStorage(config.storage);
22+
} catch (error: unknown) {
23+
loggingContext.log('error', 'Failed to create storage', {
24+
error: error instanceof Error ? error.message : 'Unknown error',
25+
stack: error instanceof Error ? error.stack : undefined,
26+
});
27+
throw error;
28+
}
29+
}
30+
31+
public async getTransport(
1132
sessionId: string
12-
): StreamableHTTPServerTransport | undefined {
33+
): Promise<StreamableHTTPServerTransport | undefined> {
1334
loggingContext.log('debug', 'Getting transport', {
1435
data: { sessionId },
1536
});
16-
return this.transports.get(sessionId);
37+
38+
// If storage contains sessionId, then check transports with sessionId. If not, then create a new transport with sessionId.
39+
const session = await this.storage.get(
40+
`${this.CACHE_KEY_PREFIX}:${sessionId}`
41+
);
42+
if (session !== null && session.trim() !== '') {
43+
if (this.transports.has(sessionId)) {
44+
loggingContext.log('debug', 'Transport found in transports', {
45+
data: { sessionId },
46+
});
47+
return this.transports.get(sessionId);
48+
}
49+
50+
// It exists in storage, but not in transports. Create a new transport with sessionId.
51+
loggingContext.log(
52+
'debug',
53+
'Transport not found in transports, creating new transport'
54+
);
55+
const newTransport = await this.createTransport(sessionId);
56+
this.transports.set(sessionId, newTransport);
57+
58+
return newTransport;
59+
}
60+
61+
// If session is not found, then return undefined.
62+
return undefined;
1763
}
1864

19-
public hasTransport(sessionId: string): boolean {
65+
public async hasTransport(sessionId: string): Promise<boolean> {
2066
loggingContext.log('debug', 'Checking if transport exists', {
2167
data: { sessionId },
2268
});
23-
return this.transports.has(sessionId);
69+
const session = await this.storage.get(
70+
`${this.CACHE_KEY_PREFIX}:${sessionId}`
71+
);
72+
return session !== null && session.trim() !== '';
2473
}
2574

26-
public createTransport(): StreamableHTTPServerTransport {
27-
const newSessionId = randomUUID();
75+
public async createTransport(
76+
sessionId: string
77+
): Promise<StreamableHTTPServerTransport> {
2878
const transport = new StreamableHTTPServerTransport({
2979
/**
3080
* Function that generates a session ID for the transport.
3181
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
3282
*
3383
* Return undefined to disable session management.
3484
*/
35-
sessionIdGenerator: (): string => newSessionId,
85+
// This is disabled to make stateless mode.
86+
sessionIdGenerator: undefined,
87+
// Below is for stateful mode.
88+
// sessionIdGenerator: (): string => sessionId,
3689
/**
3790
* If true, the server will return JSON responses instead of starting an SSE stream.
3891
* This can be useful for simple request/response scenarios without streaming.
@@ -42,45 +95,57 @@ export class TransportManager {
4295
});
4396

4497
// Manually set the session ID to ensure it's available
45-
transport.sessionId = newSessionId;
98+
transport.sessionId = sessionId;
4699
loggingContext.log('debug', 'Creating transport', {
47-
data: { sessionId: newSessionId },
100+
data: { sessionId },
48101
});
49-
this.transports.set(newSessionId, transport);
50102

51-
loggingContext.log('debug', 'Transport created', {
52-
data: { sessionId: newSessionId },
53-
});
103+
this.transports.set(sessionId, transport);
104+
await this.storage.set(
105+
`${this.CACHE_KEY_PREFIX}:${sessionId}`,
106+
JSON.stringify({
107+
createdAt: new Date().toISOString(),
108+
})
109+
);
110+
111+
loggingContext.log('debug', 'Transport created');
54112

55113
// Set up cleanup handler
56114
transport.onclose = (): void => {
57115
const currentSessionId = transport.sessionId;
58116
if (currentSessionId !== undefined && currentSessionId.trim() !== '') {
59117
this.transports.delete(currentSessionId);
118+
void this.storage.delete(
119+
`${this.CACHE_KEY_PREFIX}:${currentSessionId}`
120+
);
60121
loggingContext.log('debug', 'Transport closed and cleaned up', {
61122
data: {
62-
sessionId: currentSessionId,
63123
transportCount: this.transports.size,
64124
},
65125
});
66126
}
67127
};
68128

129+
// Connect the transport to the server
130+
loggingContext.log('debug', 'Connecting transport to server');
131+
await this.server.connect(
132+
transport as StreamableHTTPServerTransport & { sessionId: string }
133+
);
134+
69135
return transport;
70136
}
71137

72-
public deleteTransport(sessionId: string): void {
138+
public async deleteTransport(sessionId: string): Promise<void> {
73139
this.transports.delete(sessionId);
74-
loggingContext.log('info', 'Transport deleted', {
75-
data: { sessionId },
76-
});
140+
await this.storage.delete(`${this.CACHE_KEY_PREFIX}:${sessionId}`);
141+
loggingContext.log('info', 'Transport deleted');
77142
}
78143

79144
public getTransportCount(): number {
80145
return this.transports.size;
81146
}
82147

83-
public getAllSessionIds(): string[] {
84-
return Array.from(this.transports.keys());
148+
public async getAllSessionIds(): Promise<string[]> {
149+
return this.storage.keys(`${this.CACHE_KEY_PREFIX}:*`);
85150
}
86151
}

src/core/storage/memory.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ export class MemoryStorage implements Storage {
3535
async keys(pattern: string): Promise<string[]> {
3636
return Array.from(this.store.keys()).filter(key => key.startsWith(pattern));
3737
}
38+
39+
async length(): Promise<number> {
40+
return this.store.size;
41+
}
3842
}

src/core/storage/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ export interface Storage {
44
delete(key: string): Promise<boolean>;
55
keys(pattern: string): Promise<string[]>;
66
close(): Promise<void>;
7+
length(): Promise<number>;
78
}

src/core/storage/valkey.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@ export class ValkeyStorage implements Storage {
4747
const keys = await this.client.keys(pattern);
4848
return keys.map(key => key.replace(pattern, ''));
4949
}
50+
51+
async length(): Promise<number> {
52+
return this.client.dbsize();
53+
}
5054
}

0 commit comments

Comments
 (0)