Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/transport/proxy/streamable/streamable_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
// does not affect responses, so SSE response streams are unaffected. Matches
// the vMCP server default.
defaultReadTimeout = 30 * time.Second

// sseMessageEvent is the SSE event name used for JSON-RPC responses, matching
// the reference MCP server transports and ToolHive's own SSE transport.
sseMessageEvent = "message"
)

// HTTPProxy implements a proxy for streamable HTTP transport.
Expand Down Expand Up @@ -530,7 +534,7 @@ func (p *HTTPProxy) handleSingleRequestSSE(
},
}
if data, mErr := json.Marshal(errObj); mErr == nil {
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseMessageEvent, data); err != nil {
slog.Debug("failed to write error message", "error", err)
return
}
Expand All @@ -546,7 +550,8 @@ func (p *HTTPProxy) handleSingleRequestSSE(
return
}
// Write SSE event with the JSON-RPC response and flush
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { //nolint:gosec // G705: SSE data from MCP protocol
//nolint:gosec // G705: SSE data from MCP protocol
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseMessageEvent, data); err != nil {
slog.Debug("failed to write response", "error", err)
return
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/transport/proxy/streamable/streamable_proxy_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,39 @@ func startProxyWithBackend(t *testing.T, port int) (*HTTPProxy, context.Context,
return proxy, ctx, cancel
}

// TestSSEResponseIncludesEventName ensures SSE responses include an explicit
// event: message line, matching reference MCP server transports. Clients such
// as @ai-sdk/mcp only dispatch frames with event === "message".
func TestSSEResponseIncludesEventName(t *testing.T) {
t.Parallel()

const port = 8110
proxy, ctx, cancel := startProxyWithBackend(t, port)
defer cancel()
defer func() { _ = proxy.Stop(ctx) }()

url := "http://127.0.0.1:8110" + StreamableHTTPEndpoint

initJSON := `{"jsonrpc":"2.0","id":"1","method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"spec-test","version":"0.0.0"},"capabilities":{}}}`
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte(initJSON)))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
sseContent := string(body)
assert.Contains(t, sseContent, "event: message\n", "SSE frame should include explicit event name")
assert.Contains(t, sseContent, "data: ", "SSE frame should include JSON-RPC data line")
}

// TestGETReturns405 validates that GET on MCP endpoint returns 405 (server does not offer SSE here).
func TestGETReturns405(t *testing.T) {
t.Parallel()
Expand Down
Loading