diff --git a/pkg/transport/proxy/streamable/streamable_proxy.go b/pkg/transport/proxy/streamable/streamable_proxy.go index 39aa86cc1a..f57087eb08 100644 --- a/pkg/transport/proxy/streamable/streamable_proxy.go +++ b/pkg/transport/proxy/streamable/streamable_proxy.go @@ -46,6 +46,10 @@ const ( // does not affect responses, so SSE response streams are unaffected. Matches // the vMCP server default. defaultReadTimeout = 30 * time.Second + + // sseMessageEvent is the SSE event name used for JSON-RPC responses, matching + // the reference MCP server transports and ToolHive's own SSE transport. + sseMessageEvent = "message" ) // HTTPProxy implements a proxy for streamable HTTP transport. @@ -530,7 +534,7 @@ func (p *HTTPProxy) handleSingleRequestSSE( }, } if data, mErr := json.Marshal(errObj); mErr == nil { - if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseMessageEvent, data); err != nil { slog.Debug("failed to write error message", "error", err) return } @@ -546,7 +550,8 @@ func (p *HTTPProxy) handleSingleRequestSSE( return } // Write SSE event with the JSON-RPC response and flush - if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { //nolint:gosec // G705: SSE data from MCP protocol + //nolint:gosec // G705: SSE data from MCP protocol + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseMessageEvent, data); err != nil { slog.Debug("failed to write response", "error", err) return } diff --git a/pkg/transport/proxy/streamable/streamable_proxy_spec_test.go b/pkg/transport/proxy/streamable/streamable_proxy_spec_test.go index 1574462661..0376d948ff 100644 --- a/pkg/transport/proxy/streamable/streamable_proxy_spec_test.go +++ b/pkg/transport/proxy/streamable/streamable_proxy_spec_test.go @@ -53,6 +53,39 @@ func startProxyWithBackend(t *testing.T, port int) (*HTTPProxy, context.Context, return proxy, ctx, cancel } +// TestSSEResponseIncludesEventName ensures SSE responses include an explicit +// event: message line, matching reference MCP server transports. Clients such +// as @ai-sdk/mcp only dispatch frames with event === "message". +func TestSSEResponseIncludesEventName(t *testing.T) { + t.Parallel() + + const port = 8110 + proxy, ctx, cancel := startProxyWithBackend(t, port) + defer cancel() + defer func() { _ = proxy.Stop(ctx) }() + + url := "http://127.0.0.1:8110" + StreamableHTTPEndpoint + + initJSON := `{"jsonrpc":"2.0","id":"1","method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"spec-test","version":"0.0.0"},"capabilities":{}}}` + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte(initJSON))) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json, text/event-stream") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + sseContent := string(body) + assert.Contains(t, sseContent, "event: message\n", "SSE frame should include explicit event name") + assert.Contains(t, sseContent, "data: ", "SSE frame should include JSON-RPC data line") +} + // TestGETReturns405 validates that GET on MCP endpoint returns 405 (server does not offer SSE here). func TestGETReturns405(t *testing.T) { t.Parallel()