Skip to content

Commit f332c59

Browse files
committed
feat: add Dispatcher Protocol and DirectDispatcher
Introduces the Dispatcher abstraction that decouples MCP request/response handling from JSON-RPC framing. A Dispatcher exposes call/notify for outbound messages and run(on_call, on_notify) for inbound dispatch, with no knowledge of MCP types or wire encoding. - shared/dispatcher.py: Dispatcher, DispatchContext, RequestSender Protocols; CallOptions, OnCall/OnNotify, ProgressFnT, DispatchMiddleware - shared/transport_context.py: TransportContext base dataclass - shared/direct_dispatcher.py: in-memory Dispatcher impl that wires two peers with no transport; serves as a fast test substrate and second-impl proof - shared/exceptions.py: NoBackChannelError(MCPError) for transports without a server-to-client request channel - types: REQUEST_CANCELLED SDK error code The JSON-RPC implementation and ServerRunner that consume this Protocol land in follow-up PRs.
1 parent 3d7b311 commit f332c59

File tree

7 files changed

+646
-1
lines changed

7 files changed

+646
-1
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
"""In-memory `Dispatcher` that wires two peers together with no transport.
2+
3+
`DirectDispatcher` is the simplest possible `Dispatcher` implementation: a call
4+
on one side directly invokes the other side's `on_call`. There is no
5+
serialization, no JSON-RPC framing, and no streams. It exists to:
6+
7+
* prove the `Dispatcher` Protocol is implementable without JSON-RPC
8+
* provide a fast substrate for testing the layers above the dispatcher
9+
(`ServerRunner`, `Context`, `Connection`) without wire-level moving parts
10+
* embed a server in-process when the JSON-RPC overhead is unnecessary
11+
12+
Unlike `JSONRPCDispatcher`, exceptions raised in a handler propagate directly
13+
to the caller — there is no exception-to-`ErrorData` boundary here.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from collections.abc import Awaitable, Callable, Mapping
19+
from dataclasses import dataclass, field
20+
from typing import Any
21+
22+
import anyio
23+
24+
from mcp.shared.dispatcher import CallOptions, OnCall, OnNotify, ProgressFnT
25+
from mcp.shared.exceptions import MCPError, NoBackChannelError
26+
from mcp.shared.transport_context import TransportContext
27+
from mcp.types import INTERNAL_ERROR, REQUEST_TIMEOUT
28+
29+
__all__ = ["DirectDispatcher", "create_direct_dispatcher_pair"]
30+
31+
DIRECT_TRANSPORT_KIND = "direct"
32+
33+
34+
_Call = Callable[[str, Mapping[str, Any] | None, CallOptions | None], Awaitable[dict[str, Any]]]
35+
_Notify = Callable[[str, Mapping[str, Any] | None], Awaitable[None]]
36+
37+
38+
@dataclass
39+
class _DirectDispatchContext:
40+
"""`DispatchContext` for an inbound call on a `DirectDispatcher`.
41+
42+
The back-channel callables target the *originating* side, so a handler's
43+
`send_request` reaches the peer that made the inbound call.
44+
"""
45+
46+
transport: TransportContext
47+
_back_call: _Call
48+
_back_notify: _Notify
49+
_on_progress: ProgressFnT | None = None
50+
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
51+
52+
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
53+
await self._back_notify(method, params)
54+
55+
async def send_request(
56+
self,
57+
method: str,
58+
params: Mapping[str, Any] | None,
59+
opts: CallOptions | None = None,
60+
) -> dict[str, Any]:
61+
if not self.transport.can_send_request:
62+
raise NoBackChannelError(method)
63+
return await self._back_call(method, params, opts)
64+
65+
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
66+
if self._on_progress is not None:
67+
await self._on_progress(progress, total, message)
68+
69+
70+
class DirectDispatcher:
71+
"""A `Dispatcher` that calls a peer's handlers directly, in-process.
72+
73+
Two instances are wired together with `create_direct_dispatcher_pair`; each
74+
holds a reference to the other. `call` on one awaits the peer's `on_call`.
75+
`run` parks until `close` is called.
76+
"""
77+
78+
def __init__(self, transport_ctx: TransportContext):
79+
self._transport_ctx = transport_ctx
80+
self._peer: DirectDispatcher | None = None
81+
self._on_call: OnCall | None = None
82+
self._on_notify: OnNotify | None = None
83+
self._ready = anyio.Event()
84+
self._closed = anyio.Event()
85+
86+
def connect_to(self, peer: DirectDispatcher) -> None:
87+
self._peer = peer
88+
89+
async def call(
90+
self,
91+
method: str,
92+
params: Mapping[str, Any] | None,
93+
opts: CallOptions | None = None,
94+
) -> dict[str, Any]:
95+
if self._peer is None:
96+
raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
97+
return await self._peer._dispatch_call(method, params, opts)
98+
99+
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
100+
if self._peer is None:
101+
raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
102+
await self._peer._dispatch_notify(method, params)
103+
104+
async def run(self, on_call: OnCall, on_notify: OnNotify) -> None:
105+
self._on_call = on_call
106+
self._on_notify = on_notify
107+
self._ready.set()
108+
await self._closed.wait()
109+
110+
def close(self) -> None:
111+
self._closed.set()
112+
113+
def _make_context(self, on_progress: ProgressFnT | None = None) -> _DirectDispatchContext:
114+
assert self._peer is not None
115+
peer = self._peer
116+
return _DirectDispatchContext(
117+
transport=self._transport_ctx,
118+
_back_call=lambda m, p, o: peer._dispatch_call(m, p, o),
119+
_back_notify=lambda m, p: peer._dispatch_notify(m, p),
120+
_on_progress=on_progress,
121+
)
122+
123+
async def _dispatch_call(
124+
self,
125+
method: str,
126+
params: Mapping[str, Any] | None,
127+
opts: CallOptions | None,
128+
) -> dict[str, Any]:
129+
await self._ready.wait()
130+
assert self._on_call is not None
131+
opts = opts or {}
132+
dctx = self._make_context(on_progress=opts.get("on_progress"))
133+
try:
134+
with anyio.fail_after(opts.get("timeout")):
135+
try:
136+
return await self._on_call(dctx, method, params)
137+
except MCPError:
138+
raise
139+
except Exception as e:
140+
raise MCPError(code=INTERNAL_ERROR, message=str(e)) from e
141+
except TimeoutError:
142+
raise MCPError(
143+
code=REQUEST_TIMEOUT,
144+
message=f"Timed out after {opts.get('timeout')}s waiting for {method!r}",
145+
) from None
146+
147+
async def _dispatch_notify(self, method: str, params: Mapping[str, Any] | None) -> None:
148+
await self._ready.wait()
149+
assert self._on_notify is not None
150+
dctx = self._make_context()
151+
await self._on_notify(dctx, method, params)
152+
153+
154+
def create_direct_dispatcher_pair(
155+
*,
156+
can_send_request: bool = True,
157+
) -> tuple[DirectDispatcher, DirectDispatcher]:
158+
"""Create two `DirectDispatcher` instances wired to each other.
159+
160+
Args:
161+
can_send_request: Sets `TransportContext.can_send_request` on both
162+
sides. Pass ``False`` to simulate a transport with no back-channel.
163+
164+
Returns:
165+
A ``(left, right)`` pair. Conventionally ``left`` is the client side
166+
and ``right`` is the server side, but the wiring is symmetric.
167+
"""
168+
ctx = TransportContext(kind=DIRECT_TRANSPORT_KIND, can_send_request=can_send_request)
169+
left = DirectDispatcher(ctx)
170+
right = DirectDispatcher(ctx)
171+
left.connect_to(right)
172+
right.connect_to(left)
173+
return left, right

src/mcp/shared/dispatcher.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
"""Dispatcher Protocol — the call/return boundary between transports and handlers.
2+
3+
A Dispatcher turns a duplex message channel into two things:
4+
5+
* an outbound API: ``call(method, params)`` and ``notify(method, params)``
6+
* an inbound pump: ``run(on_call, on_notify)`` that drives the receive loop and
7+
invokes the supplied handlers for each incoming request/notification
8+
9+
It is deliberately *not* MCP-aware. Method names are strings, params and
10+
results are ``dict[str, Any]``. The MCP type layer (request/result models,
11+
capability negotiation, ``Context``) sits above this; the wire encoding
12+
(JSON-RPC, gRPC, in-process direct calls) sits below it.
13+
14+
See ``JSONRPCDispatcher`` for the production implementation and
15+
``DirectDispatcher`` for an in-memory implementation used in tests and for
16+
embedding a server in-process.
17+
"""
18+
19+
from collections.abc import Awaitable, Callable, Mapping
20+
from typing import Any, Protocol, TypedDict, TypeVar, runtime_checkable
21+
22+
import anyio
23+
24+
from mcp.shared.transport_context import TransportContext
25+
26+
__all__ = [
27+
"CallOptions",
28+
"DispatchContext",
29+
"DispatchMiddleware",
30+
"Dispatcher",
31+
"OnCall",
32+
"OnNotify",
33+
"ProgressFnT",
34+
"RequestSender",
35+
]
36+
37+
TransportT_co = TypeVar("TransportT_co", bound=TransportContext, covariant=True)
38+
39+
40+
class ProgressFnT(Protocol):
41+
"""Callback invoked when a progress notification arrives for a pending call."""
42+
43+
async def __call__(self, progress: float, total: float | None, message: str | None) -> None: ...
44+
45+
46+
class CallOptions(TypedDict, total=False):
47+
"""Per-call options for `RequestSender.send_request` / `Dispatcher.call`.
48+
49+
All keys are optional. Dispatchers ignore keys they do not understand.
50+
"""
51+
52+
timeout: float
53+
"""Seconds to wait for a result before raising and sending ``notifications/cancelled``."""
54+
55+
on_progress: ProgressFnT
56+
"""Receive ``notifications/progress`` updates for this call."""
57+
58+
resumption_token: str
59+
"""Opaque token to resume a previously interrupted call (transport-dependent)."""
60+
61+
on_resumption_token: Callable[[str], Awaitable[None]]
62+
"""Receive a resumption token when the transport issues one."""
63+
64+
65+
@runtime_checkable
66+
class RequestSender(Protocol):
67+
"""Anything that can send a request and await its result.
68+
69+
Both `Dispatcher` (for top-level outbound calls) and `DispatchContext`
70+
(for server-to-client calls made *during* an inbound request) satisfy this.
71+
"""
72+
73+
async def send_request(
74+
self,
75+
method: str,
76+
params: Mapping[str, Any] | None,
77+
opts: CallOptions | None = None,
78+
) -> dict[str, Any]: ...
79+
80+
81+
class DispatchContext(Protocol[TransportT_co]):
82+
"""Per-request context handed to ``on_call`` / ``on_notify``.
83+
84+
Carries the transport metadata for the inbound message and provides the
85+
back-channel for sending requests/notifications to the peer while handling
86+
it.
87+
"""
88+
89+
@property
90+
def transport(self) -> TransportT_co:
91+
"""Transport-specific metadata for this inbound message."""
92+
...
93+
94+
@property
95+
def cancel_requested(self) -> anyio.Event:
96+
"""Set when the peer sends ``notifications/cancelled`` for this request."""
97+
...
98+
99+
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
100+
"""Send a notification to the peer."""
101+
...
102+
103+
async def send_request(
104+
self,
105+
method: str,
106+
params: Mapping[str, Any] | None,
107+
opts: CallOptions | None = None,
108+
) -> dict[str, Any]:
109+
"""Send a request to the peer on the back-channel and await its result.
110+
111+
Raises:
112+
NoBackChannelError: if ``transport.can_send_request`` is ``False``.
113+
"""
114+
...
115+
116+
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
117+
"""Report progress for the inbound request, if the peer supplied a progress token.
118+
119+
A no-op when no token was supplied.
120+
"""
121+
...
122+
123+
124+
OnCall = Callable[[DispatchContext[TransportContext], str, Mapping[str, Any] | None], Awaitable[dict[str, Any]]]
125+
"""Handler for inbound requests: ``(ctx, method, params) -> result``. Raise ``MCPError`` to send an error response."""
126+
127+
OnNotify = Callable[[DispatchContext[TransportContext], str, Mapping[str, Any] | None], Awaitable[None]]
128+
"""Handler for inbound notifications: ``(ctx, method, params)``."""
129+
130+
DispatchMiddleware = Callable[[OnCall], OnCall]
131+
"""Wraps an ``OnCall`` to produce another ``OnCall``. Applied outermost-first."""
132+
133+
134+
class Dispatcher(Protocol[TransportT_co]):
135+
"""A duplex request/notification channel with call-return semantics.
136+
137+
Implementations own correlation of outbound calls to inbound results, the
138+
receive loop, per-request concurrency, and cancellation/progress wiring.
139+
"""
140+
141+
async def call(
142+
self,
143+
method: str,
144+
params: Mapping[str, Any] | None,
145+
opts: CallOptions | None = None,
146+
) -> dict[str, Any]:
147+
"""Send a request and await its result.
148+
149+
Raises:
150+
MCPError: If the peer responded with an error, or the handler
151+
raised. Implementations normalize all handler exceptions to
152+
`MCPError` so callers see a single exception type.
153+
"""
154+
...
155+
156+
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
157+
"""Send a fire-and-forget notification."""
158+
...
159+
160+
async def run(self, on_call: OnCall, on_notify: OnNotify) -> None:
161+
"""Drive the receive loop until the underlying channel closes.
162+
163+
Each inbound request is dispatched to ``on_call`` in its own task; the
164+
returned dict (or raised ``MCPError``) is sent back as the response.
165+
Inbound notifications go to ``on_notify``.
166+
"""
167+
...

src/mcp/shared/exceptions.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import Any, cast
44

5-
from mcp.types import URL_ELICITATION_REQUIRED, ElicitRequestURLParams, ErrorData, JSONRPCError
5+
from mcp.types import INVALID_REQUEST, URL_ELICITATION_REQUIRED, ElicitRequestURLParams, ErrorData, JSONRPCError
66

77

88
class MCPError(Exception):
@@ -41,6 +41,25 @@ def __str__(self) -> str:
4141
return self.message
4242

4343

44+
class NoBackChannelError(MCPError):
45+
"""Raised when sending a server-initiated request over a transport that cannot deliver it.
46+
47+
Stateless HTTP and JSON-response-mode HTTP have no channel for the server to
48+
push requests (sampling, elicitation, roots/list) to the client. This is
49+
raised by `DispatchContext.send_request` when `transport.can_send_request`
50+
is ``False``, and serializes to an ``INVALID_REQUEST`` error response.
51+
"""
52+
53+
def __init__(self, method: str):
54+
super().__init__(
55+
code=INVALID_REQUEST,
56+
message=(
57+
f"Cannot send {method!r}: this transport context has no back-channel for server-initiated requests."
58+
),
59+
)
60+
self.method = method
61+
62+
4463
class StatelessModeNotSupported(RuntimeError):
4564
"""Raised when attempting to use a method that is not supported in stateless mode.
4665

0 commit comments

Comments
 (0)