From 8ce251aa4a41b3ae25b78a7a08c852f099d12634 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 15:12:56 +0000 Subject: [PATCH 01/10] fix: wait for initialize endpoint before updating client configs This ensures MCP servers are fully ready before notifying clients, preventing connection timing issues. The proxy now: 1. Starts the container and proxy 2. Repeatedly calls the initialize endpoint until it succeeds (HTTP 200) 3. Waits up to 5 minutes with exponential backoff (100ms-2s) 4. Only then updates client configurations For streamable-http transport, the proxy calls POST /mcp with proper headers (Accept: application/json, text/event-stream) to ensure compatibility with servers that require both content types. Fixes #2499 --- pkg/runner/runner.go | 132 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 124 insertions(+), 8 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 48b37f8b7..b684e48bc 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -2,11 +2,13 @@ package runner import ( + "bytes" "context" "fmt" "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -274,6 +276,24 @@ func (r *Runner) Run(ctx context.Context) error { logger.Infof("MCP server %s started successfully", r.Config.ContainerName) + // Wait for the MCP server to accept initialize requests before updating client configurations. + // This prevents timing issues where clients try to connect before the server is fully ready. + // We repeatedly call initialize until it succeeds (up to 5 minutes). + transportType := labels.GetTransportType(r.Config.ContainerLabels) + serverURL := transport.GenerateMCPServerURL( + transportType, + "localhost", + r.Config.Port, + r.Config.ContainerName, + r.Config.RemoteURL) + + // Repeatedly try calling initialize until it succeeds (up to 5 minutes) + // Some servers (like mcp-optimizer) can take significant time to start up + if err := waitForInitializeSuccess(ctx, serverURL, transportType, 5*time.Minute); err != nil { + logger.Warnf("Warning: Initialize not successful, but continuing: %v", err) + // Continue anyway to maintain backward compatibility, but log a warning + } + // Update client configurations with the MCP server URL. // Note that this function checks the configuration to determine which // clients should be updated, if any. @@ -281,14 +301,6 @@ func (r *Runner) Run(ctx context.Context) error { if err != nil { logger.Warnf("Warning: Failed to create client manager: %v", err) } else { - transportType := labels.GetTransportType(r.Config.ContainerLabels) - serverURL := transport.GenerateMCPServerURL( - transportType, - "localhost", - r.Config.Port, - r.Config.ContainerName, - r.Config.RemoteURL) - if err := clientManager.AddServerToClients(ctx, r.Config.ContainerName, serverURL, transportType, r.Config.Group); err != nil { logger.Warnf("Warning: Failed to add server to client configurations: %v", err) } @@ -450,3 +462,107 @@ func (r *Runner) Cleanup(ctx context.Context) error { return lastErr } + +// waitForInitializeSuccess repeatedly calls the MCP server's initialize endpoint until it succeeds. +// This prevents timing issues where clients try to connect before the server is fully ready. +// It makes repeated attempts with exponential backoff up to a maximum timeout. +func waitForInitializeSuccess(ctx context.Context, serverURL, transportType string, maxWaitTime time.Duration) error { + // Construct the endpoint URL based on transport type + var endpoint string + switch transportType { + case "streamable-http", "streamable": + // For streamable-http, serverURL already contains the /mcp path + // Format: http://localhost:port/mcp + endpoint = serverURL + case "sse", "stdio": + // For SSE/stdio, serverURL contains /sse#container-name + // Format: http://localhost:port/sse#container-name + // We need to change it to /messages (without the fragment) + endpoint = serverURL + // Remove fragment if present (everything after #) + if idx := strings.Index(endpoint, "#"); idx != -1 { + endpoint = endpoint[:idx] + } + // Replace /sse with /messages for the initialize call + endpoint = strings.Replace(endpoint, "/sse", "/messages", 1) + default: + // For other transports, no HTTP initialize check is needed + logger.Debugf("Skipping initialize check for transport type: %s", transportType) + return nil + } + + // Create the initialize request payload + initPayload := `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` + + `"params":{"protocolVersion":"2024-11-05","capabilities":{},` + + `"clientInfo":{"name":"toolhive","version":"1.0"}}}` + + // Setup retry logic with exponential backoff + startTime := time.Now() + attempt := 0 + baseDelay := 100 * time.Millisecond + maxDelay := 2 * time.Second // Cap at 2 seconds between retries + + logger.Infof("Waiting for MCP server to accept initialize requests at %s (timeout: %v)", endpoint, maxWaitTime) + + for { + attempt++ + + // Create a new HTTP client with a reasonable timeout for each request + httpClient := &http.Client{ + Timeout: 10 * time.Second, + } + + // Make the initialize request + req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBufferString(initPayload)) + if err != nil { + logger.Debugf("Failed to create initialize request (attempt %d): %v", attempt, err) + } else { + req.Header.Set("Content-Type", "application/json") + // MCP servers may require clients to accept both JSON and SSE responses + req.Header.Set("Accept", "application/json, text/event-stream") + req.Header.Set("MCP-Protocol-Version", "2024-11-05") + + resp, err := httpClient.Do(req) + if err == nil { + //nolint:errcheck // Ignoring close error on response body in error path + defer resp.Body.Close() + + // Accept 200 OK as success + if resp.StatusCode == http.StatusOK { + elapsed := time.Since(startTime) + logger.Infof("MCP server successfully responded to initialize after %v (attempt %d)", elapsed, attempt) + return nil + } + + logger.Debugf("Initialize returned status %d (attempt %d)", resp.StatusCode, attempt) + } else { + logger.Debugf("Failed to reach initialize endpoint (attempt %d): %v", attempt, err) + } + } + + // Check if we've exceeded the maximum wait time + elapsed := time.Since(startTime) + if elapsed >= maxWaitTime { + return fmt.Errorf("initialize not successful after %v (%d attempts)", elapsed, attempt) + } + + // Calculate delay with exponential backoff + // Cap the exponent to prevent overflow + exp := attempt - 1 + if exp > 10 { + exp = 10 + } + delay := baseDelay * time.Duration(1< maxDelay { + delay = maxDelay + } + + // Wait before retrying + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for initialize") + case <-time.After(delay): + // Continue to next attempt + } + } +} From 441004dfc2e2e7717d8a35c99dba03faf491040c Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 15:56:51 +0000 Subject: [PATCH 02/10] Fix whitespace Signed-off-by: nigel brown --- pkg/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index b684e48bc..36ff14df0 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -526,7 +526,7 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri if err == nil { //nolint:errcheck // Ignoring close error on response body in error path defer resp.Body.Close() - + // Accept 200 OK as success if resp.StatusCode == http.StatusOK { elapsed := time.Since(startTime) From c2218749455484eaf3f42c26dca575bb281262e0 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 16:00:21 +0000 Subject: [PATCH 03/10] Fix SSE Signed-off-by: nigel brown --- pkg/runner/runner.go | 66 ++++++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 36ff14df0..89072d549 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -463,46 +463,49 @@ func (r *Runner) Cleanup(ctx context.Context) error { return lastErr } -// waitForInitializeSuccess repeatedly calls the MCP server's initialize endpoint until it succeeds. +// waitForInitializeSuccess repeatedly checks if the MCP server is ready to accept requests. // This prevents timing issues where clients try to connect before the server is fully ready. // It makes repeated attempts with exponential backoff up to a maximum timeout. func waitForInitializeSuccess(ctx context.Context, serverURL, transportType string, maxWaitTime time.Duration) error { - // Construct the endpoint URL based on transport type + // Determine the endpoint and method to use based on transport type var endpoint string + var method string + var payload string + switch transportType { case "streamable-http", "streamable": - // For streamable-http, serverURL already contains the /mcp path + // For streamable-http, send initialize request to /mcp endpoint // Format: http://localhost:port/mcp endpoint = serverURL + method = "POST" + payload = `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` + + `"params":{"protocolVersion":"2024-11-05","capabilities":{},` + + `"clientInfo":{"name":"toolhive","version":"1.0"}}}` case "sse", "stdio": - // For SSE/stdio, serverURL contains /sse#container-name - // Format: http://localhost:port/sse#container-name - // We need to change it to /messages (without the fragment) + // For SSE/stdio, just check if the SSE endpoint is available + // We can't easily call initialize without establishing a full SSE connection, + // so we just verify the endpoint responds. + // Format: http://localhost:port/sse#container-name -> http://localhost:port/sse endpoint = serverURL // Remove fragment if present (everything after #) if idx := strings.Index(endpoint, "#"); idx != -1 { endpoint = endpoint[:idx] } - // Replace /sse with /messages for the initialize call - endpoint = strings.Replace(endpoint, "/sse", "/messages", 1) + method = "GET" + payload = "" default: - // For other transports, no HTTP initialize check is needed - logger.Debugf("Skipping initialize check for transport type: %s", transportType) + // For other transports, no HTTP check is needed + logger.Debugf("Skipping readiness check for transport type: %s", transportType) return nil } - // Create the initialize request payload - initPayload := `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` + - `"params":{"protocolVersion":"2024-11-05","capabilities":{},` + - `"clientInfo":{"name":"toolhive","version":"1.0"}}}` - // Setup retry logic with exponential backoff startTime := time.Now() attempt := 0 baseDelay := 100 * time.Millisecond maxDelay := 2 * time.Second // Cap at 2 seconds between retries - logger.Infof("Waiting for MCP server to accept initialize requests at %s (timeout: %v)", endpoint, maxWaitTime) + logger.Infof("Waiting for MCP server to be ready at %s (timeout: %v)", endpoint, maxWaitTime) for { attempt++ @@ -512,31 +515,40 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri Timeout: 10 * time.Second, } - // Make the initialize request - req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBufferString(initPayload)) + // Make the readiness check request + var req *http.Request + var err error + if payload != "" { + req, err = http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBufferString(payload)) + } else { + req, err = http.NewRequestWithContext(ctx, method, endpoint, nil) + } + if err != nil { - logger.Debugf("Failed to create initialize request (attempt %d): %v", attempt, err) + logger.Debugf("Failed to create request (attempt %d): %v", attempt, err) } else { - req.Header.Set("Content-Type", "application/json") - // MCP servers may require clients to accept both JSON and SSE responses - req.Header.Set("Accept", "application/json, text/event-stream") - req.Header.Set("MCP-Protocol-Version", "2024-11-05") + if method == "POST" { + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json, text/event-stream") + req.Header.Set("MCP-Protocol-Version", "2024-11-05") + } resp, err := httpClient.Do(req) if err == nil { //nolint:errcheck // Ignoring close error on response body in error path defer resp.Body.Close() - // Accept 200 OK as success + // For GET (SSE), accept 200 OK + // For POST (streamable-http), also accept 200 OK if resp.StatusCode == http.StatusOK { elapsed := time.Since(startTime) - logger.Infof("MCP server successfully responded to initialize after %v (attempt %d)", elapsed, attempt) + logger.Infof("MCP server is ready after %v (attempt %d)", elapsed, attempt) return nil } - logger.Debugf("Initialize returned status %d (attempt %d)", resp.StatusCode, attempt) + logger.Debugf("Server returned status %d (attempt %d)", resp.StatusCode, attempt) } else { - logger.Debugf("Failed to reach initialize endpoint (attempt %d): %v", attempt, err) + logger.Debugf("Failed to reach endpoint (attempt %d): %v", attempt, err) } } From 53af5c439d3940fea9c3183a0ec171819e05d8ce Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 16:52:41 +0000 Subject: [PATCH 04/10] better initialization Signed-off-by: nigel brown --- pkg/runner/runner.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 89072d549..3c19f8b64 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -507,14 +507,14 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri logger.Infof("Waiting for MCP server to be ready at %s (timeout: %v)", endpoint, maxWaitTime) + // Create HTTP client with a reasonable timeout for requests + httpClient := &http.Client{ + Timeout: 10 * time.Second, + } + for { attempt++ - // Create a new HTTP client with a reasonable timeout for each request - httpClient := &http.Client{ - Timeout: 10 * time.Second, - } - // Make the readiness check request var req *http.Request var err error From 4cb32b235d9612ab128080a3eb6e0eec73a694fb Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 16:55:03 +0000 Subject: [PATCH 05/10] save a variable Signed-off-by: nigel brown --- pkg/runner/runner.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 3c19f8b64..35dc11e4e 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -502,7 +502,7 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri // Setup retry logic with exponential backoff startTime := time.Now() attempt := 0 - baseDelay := 100 * time.Millisecond + delay := 100 * time.Millisecond maxDelay := 2 * time.Second // Cap at 2 seconds between retries logger.Infof("Waiting for MCP server to be ready at %s (timeout: %v)", endpoint, maxWaitTime) @@ -558,17 +558,6 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri return fmt.Errorf("initialize not successful after %v (%d attempts)", elapsed, attempt) } - // Calculate delay with exponential backoff - // Cap the exponent to prevent overflow - exp := attempt - 1 - if exp > 10 { - exp = 10 - } - delay := baseDelay * time.Duration(1< maxDelay { - delay = maxDelay - } - // Wait before retrying select { case <-ctx.Done(): @@ -576,5 +565,11 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri case <-time.After(delay): // Continue to next attempt } + + // Update delay for next iteration with exponential backoff + delay *= 2 + if delay > maxDelay { + delay = maxDelay + } } } From 9e610d04b9f64f63a9427806d78f8bb66b5eaf35 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 17:00:16 +0000 Subject: [PATCH 06/10] The new code appears slower because it waits for init. Longer timeout Signed-off-by: nigel brown --- test/e2e/osv_mcp_server_test.go | 26 +++++++++++++------------- test/e2e/proxy_oauth_test.go | 8 ++++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/test/e2e/osv_mcp_server_test.go b/test/e2e/osv_mcp_server_test.go index 9488a52f0..80c5b804d 100644 --- a/test/e2e/osv_mcp_server_test.go +++ b/test/e2e/osv_mcp_server_test.go @@ -58,9 +58,9 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { // The command should indicate success Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) - Expect(err).ToNot(HaveOccurred(), "Server should be running within 60 seconds") + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") By("Verifying the server appears in the list with SSE transport") stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() @@ -78,7 +78,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { "osv").ExpectSuccess() By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) By("Getting the server URL") @@ -128,7 +128,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { "osv").ExpectSuccess() By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) By("Getting the server URL") @@ -136,7 +136,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { Expect(err).ToNot(HaveOccurred()) By("Waiting for MCP server to be ready") - err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 60*time.Second) + err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) Expect(err).ToNot(HaveOccurred(), "MCP server should be ready for protocol operations") By("Creating MCP client and initializing connection") @@ -181,14 +181,14 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { "--name", serverName, "--transport", "sse", "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) // Get server URL serverURL, err = e2e.GetMCPServerURL(config, serverName) Expect(err).ToNot(HaveOccurred()) - err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 60*time.Second) + err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) Expect(err).ToNot(HaveOccurred()) }) @@ -327,7 +327,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { "--name", serverName, "--transport", "sse", "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) }) @@ -361,7 +361,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { Expect(stdout).To(ContainSubstring(serverName)) By("Waiting for the server to be running again") - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) By("Verifying SSE endpoint is accessible again") @@ -428,7 +428,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { "--transport", "sse", "osv").ExpectSuccess() // ensure it's actually up before attempting the duplicate - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred(), "first server should start") By("Attempting to start a second server with the same name") @@ -489,7 +489,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { // 2) Wait until the server is reported as running. By("waiting for foreground server to be running") - err := e2e.WaitForMCPServer(config, serverName, 60*time.Second) + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) Expect(err).ToNot(HaveOccurred(), "server should reach running state") // 3) Verify workload is running via workload manager @@ -516,7 +516,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { Expect(stdout).To(ContainSubstring("running"), "server should be running") if serverURL, gerr := e2e.GetMCPServerURL(config, serverName); gerr == nil { - rerr := e2e.WaitForMCPServerReady(config, serverURL, "sse", 15*time.Second) + rerr := e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) Expect(rerr).ToNot(HaveOccurred(), "server should be protocol-ready") } diff --git a/test/e2e/proxy_oauth_test.go b/test/e2e/proxy_oauth_test.go index e1fcc06b5..95572299b 100644 --- a/test/e2e/proxy_oauth_test.go +++ b/test/e2e/proxy_oauth_test.go @@ -82,7 +82,7 @@ var _ = Describe("Proxy OAuth Authentication E2E", Label("proxy", "oauth", "e2e" // Wait for OIDC server to be ready Eventually(func() error { return checkServerHealth(fmt.Sprintf("%s/.well-known/openid-configuration", mockOIDCBaseURL)) - }, 30*time.Second, 1*time.Second).Should(Succeed()) + }, 5*time.Minute, 1*time.Second).Should(Succeed()) // Start OSV MCP server that will be our target By("Starting OSV MCP server as target") @@ -92,7 +92,7 @@ var _ = Describe("Proxy OAuth Authentication E2E", Label("proxy", "oauth", "e2e" "osv").ExpectSuccess() // Wait for OSV server to be ready - err = e2e.WaitForMCPServer(config, osvServerName, 60*time.Second) + err = e2e.WaitForMCPServer(config, osvServerName, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) }) @@ -376,7 +376,7 @@ var _ = Describe("Proxy OAuth Authentication E2E", Label("proxy", "oauth", "e2e" proxyURL := fmt.Sprintf("http://localhost:%d/mcp", proxyPort) // Wait for proxy to be ready for MCP connections - err = e2e.WaitForMCPServerReady(config, proxyURL, "streamable-http", 60*time.Second) + err = e2e.WaitForMCPServerReady(config, proxyURL, "streamable-http", 5*time.Minute) if err != nil { GinkgoWriter.Printf("MCP connection through proxy failed: %v\n", err) Skip("Skipping MCP test due to proxy not being ready") @@ -443,7 +443,7 @@ var _ = Describe("Proxy OAuth Authentication E2E", Label("proxy", "oauth", "e2e" By("Reconnecting via MCP to trigger token refresh") proxyURL := fmt.Sprintf("http://localhost:%d/mcp", proxyPort) - err = e2e.WaitForMCPServerReady(config, proxyURL, "streamable-http", 10*time.Second) + err = e2e.WaitForMCPServerReady(config, proxyURL, "streamable-http", 5*time.Minute) Expect(err).ToNot(HaveOccurred(), "MCP server not ready after token expiry") mcpClient, err := e2e.NewMCPClientForStreamableHTTP(config, proxyURL) From df9934361b78a888277153523123f36ae05ad339 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Fri, 7 Nov 2025 17:14:16 +0000 Subject: [PATCH 07/10] fix: gofmt indentation for osv_mcp_server_test.go Signed-off-by: nigel brown --- test/e2e/osv_mcp_server_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/osv_mcp_server_test.go b/test/e2e/osv_mcp_server_test.go index 80c5b804d..d5da1761b 100644 --- a/test/e2e/osv_mcp_server_test.go +++ b/test/e2e/osv_mcp_server_test.go @@ -58,9 +58,9 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { // The command should indicate success Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") By("Verifying the server appears in the list with SSE transport") stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() From 0220cee01ab5787514dd852eb6ed6427c4d9b2b4 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Sat, 8 Nov 2025 06:51:08 +0000 Subject: [PATCH 08/10] Unrelated registry change The OSV server transport changed from sse to streamable-http on November 4, 2025 in commit 31f2d8b24 (PR #2441). Signed-off-by: nigel brown --- test/e2e/osv_mcp_server_test.go | 730 ++++++++++++++++---------------- 1 file changed, 365 insertions(+), 365 deletions(-) diff --git a/test/e2e/osv_mcp_server_test.go b/test/e2e/osv_mcp_server_test.go index d5da1761b..a3d574848 100644 --- a/test/e2e/osv_mcp_server_test.go +++ b/test/e2e/osv_mcp_server_test.go @@ -20,7 +20,7 @@ func generateUniqueServerName(prefix string) string { return fmt.Sprintf("%s-%d-%d-%d", prefix, os.Getpid(), time.Now().UnixNano(), GinkgoRandomSeed()) } -var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { +var _ = Describe("OsvMcpServer", Label("mcp", "streamable-http", "e2e"), Serial, func() { var config *e2e.TestConfig BeforeEach(func() { @@ -31,7 +31,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { Expect(err).ToNot(HaveOccurred(), "thv binary should be available") }) - Describe("Running OSV MCP server with SSE transport", func() { + Describe("Running OSV MCP server with streamable-http transport", func() { Context("when starting the server from registry", func() { var serverName string @@ -47,338 +47,338 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { } }) - It("should successfully start and be accessible via SSE [Serial]", func() { - By("Starting the OSV MCP server with SSE transport and audit enabled") - stdout, stderr := e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "sse", - "--enable-audit", - "osv").ExpectSuccess() - - // The command should indicate success - Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") - - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") - - By("Verifying the server appears in the list with SSE transport") - stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "Server should appear in the list") - Expect(stdout).To(ContainSubstring("running"), "Server should be in running state") - Expect(stdout).To(ContainSubstring("sse"), "Server should show SSE transport") - }) + It("should successfully start and be accessible via streamable-http [Serial]", func() { + By("Starting the OSV MCP server with streamable-http transport and audit enabled") + stdout, stderr := e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--enable-audit", + "osv").ExpectSuccess() - It("should be accessible via HTTP SSE endpoint [Serial]", func() { - By("Starting the OSV MCP server with audit enabled") - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "sse", - "--enable-audit", - "osv").ExpectSuccess() + // The command should indicate success + Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") - By("Getting the server URL") - serverURL, err := e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to get server URL") - Expect(serverURL).To(ContainSubstring("http"), "URL should be HTTP-based") - Expect(serverURL).To(ContainSubstring("/sse"), "URL should contain SSE endpoint") + By("Verifying the server appears in the list with streamable-http transport") + stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "Server should appear in the list") + Expect(stdout).To(ContainSubstring("running"), "Server should be in running state") + Expect(stdout).To(ContainSubstring("mcp"), "Server should show mcp endpoint") + }) - By("Waiting before starting the HTTP request") - time.Sleep(10 * time.Second) + It("should be accessible via HTTP streamable-http endpoint [Serial]", func() { + By("Starting the OSV MCP server with audit enabled") + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--enable-audit", + "osv").ExpectSuccess() - By("Making an HTTP request to the SSE endpoint") - - client := &http.Client{Timeout: 10 * time.Second} - var resp *http.Response - var httpErr error - - maxRetries := 5 - for i := 0; i < maxRetries; i++ { - req, err := http.NewRequest("GET", serverURL, nil) - Expect(err).ToNot(HaveOccurred()) - req.Header.Set("Accept", "text/event-stream") - - resp, httpErr = client.Do(req) - if httpErr == nil && resp.StatusCode >= 200 && resp.StatusCode < 500 { - break - } - if resp != nil { - resp.Body.Close() - } - time.Sleep(10 * time.Second) - } + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - Expect(httpErr).ToNot(HaveOccurred(), "Should be able to connect to SSE endpoint") - Expect(resp).ToNot(BeNil(), "Response should not be nil") - defer resp.Body.Close() + By("Getting the server URL") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to get server URL") + Expect(serverURL).To(ContainSubstring("http"), "URL should be HTTP-based") + Expect(serverURL).To(ContainSubstring("/mcp"), "URL should contain MCP endpoint") - Expect(resp.StatusCode).To(BeNumerically(">=", 200), "Should get a valid HTTP response") - Expect(resp.StatusCode).To(BeNumerically("<", 500), "Should not get a server error") - }) + By("Waiting before starting the HTTP request") + time.Sleep(10 * time.Second) - It("should respond to proper MCP protocol operations [Serial]", func() { - By("Starting the OSV MCP server") - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "sse", - "osv").ExpectSuccess() + By("Making an HTTP request to the streamable-http endpoint") - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + client := &http.Client{Timeout: 10 * time.Second} + var resp *http.Response + var httpErr error - By("Getting the server URL") - serverURL, err := e2e.GetMCPServerURL(config, serverName) + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + req, err := http.NewRequest("GET", serverURL, nil) Expect(err).ToNot(HaveOccurred()) + req.Header.Set("Accept", "text/event-stream") - By("Waiting for MCP server to be ready") - err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "MCP server should be ready for protocol operations") + resp, httpErr = client.Do(req) + if httpErr == nil && resp.StatusCode >= 200 && resp.StatusCode < 500 { + break + } + if resp != nil { + resp.Body.Close() + } + time.Sleep(10 * time.Second) + } - By("Creating MCP client and initializing connection") - mcpClient, err := e2e.NewMCPClientForSSE(config, serverURL) - Expect(err).ToNot(HaveOccurred(), "Should be able to create MCP client") - defer mcpClient.Close() + Expect(httpErr).ToNot(HaveOccurred(), "Should be able to connect to streamable-http endpoint") + Expect(resp).ToNot(BeNil(), "Response should not be nil") + defer resp.Body.Close() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + Expect(resp.StatusCode).To(BeNumerically(">=", 200), "Should get a valid HTTP response") + Expect(resp.StatusCode).To(BeNumerically("<", 500), "Should not get a server error") + }) - err = mcpClient.Initialize(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to initialize MCP connection") + It("should respond to proper MCP protocol operations [Serial]", func() { + By("Starting the OSV MCP server") + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() - By("Testing basic MCP operations") - err = mcpClient.Ping(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to ping the server") + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - By("Listing available tools") - tools, err := mcpClient.ListTools(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to list tools") - Expect(tools.Tools).ToNot(BeEmpty(), "OSV server should provide tools") + By("Getting the server URL") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) - GinkgoWriter.Printf("Available tools: %d\n", len(tools.Tools)) - for _, tool := range tools.Tools { - GinkgoWriter.Printf(" - %s: %s\n", tool.Name, tool.Description) - } - }) - }) + By("Waiting for MCP server to be ready") + err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "MCP server should be ready for protocol operations") - Context("when testing OSV-specific functionality", Ordered, func() { - var mcpClient *e2e.MCPClientHelper - var serverURL string - var cancel context.CancelFunc - var serverName string + By("Creating MCP client and initializing connection") + mcpClient, err := e2e.NewMCPClientForStreamableHTTP(config, serverURL) + Expect(err).ToNot(HaveOccurred(), "Should be able to create MCP client") + defer mcpClient.Close() - BeforeAll(func() { - // Generate unique server name for this context - serverName = generateUniqueServerName("osv-functionality-test") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - // Start ONE server for ALL OSV-specific tests - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "sse", - "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + err = mcpClient.Initialize(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to initialize MCP connection") - // Get server URL - serverURL, err = e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred()) + By("Testing basic MCP operations") + err = mcpClient.Ping(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to ping the server") - err = e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - }) + By("Listing available tools") + tools, err := mcpClient.ListTools(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to list tools") + Expect(tools.Tools).ToNot(BeEmpty(), "OSV server should provide tools") - BeforeEach(func() { - // Create fresh MCP client for each test - var err error - mcpClient, err = e2e.NewMCPClientForSSE(config, serverURL) - Expect(err).ToNot(HaveOccurred()) + GinkgoWriter.Printf("Available tools: %d\n", len(tools.Tools)) + for _, tool := range tools.Tools { + GinkgoWriter.Printf(" - %s: %s\n", tool.Name, tool.Description) + } + }) + }) - // Create context that will be cancelled in AfterEach - ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) - cancel = cancelFunc - err = mcpClient.Initialize(ctx) - Expect(err).ToNot(HaveOccurred()) - }) + Context("when testing OSV-specific functionality", Ordered, func() { + var mcpClient *e2e.MCPClientHelper + var serverURL string + var cancel context.CancelFunc + var serverName string - AfterEach(func() { - if cancel != nil { - cancel() - } - if mcpClient != nil { - mcpClient.Close() - } - }) + BeforeAll(func() { + // Generate unique server name for this context + serverName = generateUniqueServerName("osv-functionality-test") - AfterAll(func() { - if config.CleanupAfter { - // Clean up the shared server after all tests - err := e2e.StopAndRemoveMCPServer(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") - } - }) + // Start ONE server for ALL OSV-specific tests + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - It("should be listed in registry with OSV-specific information [Serial]", func() { - By("Getting OSV server info from registry") - stdout, _ := e2e.NewTHVCommand(config, "registry", "info", "osv").ExpectSuccess() - Expect(stdout).To(ContainSubstring("osv"), "Info should be about OSV server") - Expect(stdout).To(ContainSubstring("vulnerability"), "Info should mention vulnerability scanning") - }) + // Get server URL + serverURL, err = e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) - It("should provide vulnerability query tools [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + }) - By("Listing available tools") - mcpClient.ExpectToolExists(ctx, "query_vulnerability") + BeforeEach(func() { + // Create fresh MCP client for each test + var err error + mcpClient, err = e2e.NewMCPClientForStreamableHTTP(config, serverURL) + Expect(err).ToNot(HaveOccurred()) + + // Create context that will be cancelled in AfterEach + ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) + cancel = cancelFunc + err = mcpClient.Initialize(ctx) + Expect(err).ToNot(HaveOccurred()) + }) - By("Testing vulnerability query with a known package") - // Test with a well-known package that should have vulnerabilities - arguments := map[string]interface{}{ - "package_name": "lodash", - "ecosystem": "npm", - "version": "4.17.15", // Known vulnerable version from OSV docs - } + AfterEach(func() { + if cancel != nil { + cancel() + } + if mcpClient != nil { + mcpClient.Close() + } + }) - result := mcpClient.ExpectToolCall(ctx, "query_vulnerability", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability information") + AfterAll(func() { + if config.CleanupAfter { + // Clean up the shared server after all tests + err := e2e.StopAndRemoveMCPServer(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") + } + }) - GinkgoWriter.Printf("Vulnerability query result: %+v\n", result.Content) - }) + It("should be listed in registry with OSV-specific information [Serial]", func() { + By("Getting OSV server info from registry") + stdout, _ := e2e.NewTHVCommand(config, "registry", "info", "osv").ExpectSuccess() + Expect(stdout).To(ContainSubstring("osv"), "Info should be about OSV server") + Expect(stdout).To(ContainSubstring("vulnerability"), "Info should mention vulnerability scanning") + }) - It("should handle batch vulnerability queries [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - By("Testing batch vulnerability query") - mcpClient.ExpectToolExists(ctx, "query_vulnerabilities_batch") - - arguments := map[string]interface{}{ - "queries": []map[string]interface{}{ - { - "package_name": "lodash", - "ecosystem": "npm", - "version": "4.17.15", - }, - { - "package_name": "jinja2", - "ecosystem": "PyPI", - "version": "2.4.1", - }, - }, - } + It("should provide vulnerability query tools [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - result := mcpClient.ExpectToolCall(ctx, "query_vulnerabilities_batch", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return batch vulnerability information") + By("Listing available tools") + mcpClient.ExpectToolExists(ctx, "query_vulnerability") - GinkgoWriter.Printf("Batch vulnerability query result: %+v\n", result.Content) - }) + By("Testing vulnerability query with a known package") + // Test with a well-known package that should have vulnerabilities + arguments := map[string]interface{}{ + "package_name": "lodash", + "ecosystem": "npm", + "version": "4.17.15", // Known vulnerable version from OSV docs + } - It("should get vulnerability details by ID [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + result := mcpClient.ExpectToolCall(ctx, "query_vulnerability", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability information") - By("Testing get vulnerability by ID") - mcpClient.ExpectToolExists(ctx, "get_vulnerability") + GinkgoWriter.Printf("Vulnerability query result: %+v\n", result.Content) + }) - arguments := map[string]interface{}{ - "id": "GHSA-vqj2-4v8m-8vrq", // Example from OSV docs - } + It("should handle batch vulnerability queries [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - result := mcpClient.ExpectToolCall(ctx, "get_vulnerability", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability details") + By("Testing batch vulnerability query") + mcpClient.ExpectToolExists(ctx, "query_vulnerabilities_batch") - GinkgoWriter.Printf("Vulnerability details result: %+v\n", result.Content) - }) + arguments := map[string]interface{}{ + "queries": []map[string]interface{}{ + { + "package_name": "lodash", + "ecosystem": "npm", + "version": "4.17.15", + }, + { + "package_name": "jinja2", + "ecosystem": "PyPI", + "version": "2.4.1", + }, + }, + } + + result := mcpClient.ExpectToolCall(ctx, "query_vulnerabilities_batch", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return batch vulnerability information") + + GinkgoWriter.Printf("Batch vulnerability query result: %+v\n", result.Content) + }) - It("should handle invalid vulnerability queries gracefully [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + It("should get vulnerability details by ID [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - By("Testing with invalid package information") - arguments := map[string]interface{}{ - "package_name": "non-existent-package-12345", - "ecosystem": "npm", - "version": "1.0.0", - } + By("Testing get vulnerability by ID") + mcpClient.ExpectToolExists(ctx, "get_vulnerability") - // This should not fail, but should return empty results - result, err := mcpClient.CallTool(ctx, "query_vulnerability", arguments) - Expect(err).ToNot(HaveOccurred(), "Should handle invalid queries gracefully") - Expect(result).ToNot(BeNil(), "Should return a result even for non-existent packages") + arguments := map[string]interface{}{ + "id": "GHSA-vqj2-4v8m-8vrq", // Example from OSV docs + } - GinkgoWriter.Printf("Invalid query result: %+v\n", result.Content) - }) - }) + result := mcpClient.ExpectToolCall(ctx, "get_vulnerability", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability details") - Context("when managing server lifecycle", func() { - var serverName string + GinkgoWriter.Printf("Vulnerability details result: %+v\n", result.Content) + }) - BeforeEach(func() { - // Generate unique server name for each lifecycle test - serverName = generateUniqueServerName("osv-lifecycle-test") + It("should handle invalid vulnerability queries gracefully [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - // Start a server for lifecycle tests - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "sse", - "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - }) + By("Testing with invalid package information") + arguments := map[string]interface{}{ + "package_name": "non-existent-package-12345", + "ecosystem": "npm", + "version": "1.0.0", + } - AfterEach(func() { - if config.CleanupAfter { - // Clean up the server after each lifecycle test - err := e2e.StopAndRemoveMCPServer(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") - } - }) + // This should not fail, but should return empty results + result, err := mcpClient.CallTool(ctx, "query_vulnerability", arguments) + Expect(err).ToNot(HaveOccurred(), "Should handle invalid queries gracefully") + Expect(result).ToNot(BeNil(), "Should return a result even for non-existent packages") - It("should stop the SSE server successfully [Serial]", func() { - By("Stopping the server") - stdout, _ := e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "Output should mention the server name") - - By("Verifying the server is stopped") - Eventually(func() string { - stdout, _ := e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess() - return stdout - }, 10*time.Second, 1*time.Second).Should(Or( - // Server should either be in exited state or completely removed - And(ContainSubstring(serverName), ContainSubstring("stopped")), - Not(ContainSubstring(serverName)), - ), "Server should be stopped (exited) or removed from list") - }) + GinkgoWriter.Printf("Invalid query result: %+v\n", result.Content) + }) + }) - It("should restart the SSE server successfully [Serial]", func() { - By("Restarting the server") - stdout, _ := e2e.NewTHVCommand(config, "restart", serverName).ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName)) + Context("when managing server lifecycle", func() { + var serverName string - By("Waiting for the server to be running again") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + BeforeEach(func() { + // Generate unique server name for each lifecycle test + serverName = generateUniqueServerName("osv-lifecycle-test") - By("Verifying SSE endpoint is accessible again") - serverURL, err := e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred()) + // Start a server for lifecycle tests + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + }) - client := &http.Client{Timeout: 5 * time.Second} - resp, err := client.Get(serverURL) - if err == nil { - resp.Body.Close() - } - // Connection attempt should not fail completely - }) + AfterEach(func() { + if config.CleanupAfter { + // Clean up the server after each lifecycle test + err := e2e.StopAndRemoveMCPServer(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") + } + }) + + It("should stop the streamable-http server successfully [Serial]", func() { + By("Stopping the server") + stdout, _ := e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "Output should mention the server name") + + By("Verifying the server is stopped") + Eventually(func() string { + stdout, _ := e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess() + return stdout + }, 10*time.Second, 1*time.Second).Should(Or( + // Server should either be in exited state or completely removed + And(ContainSubstring(serverName), ContainSubstring("stopped")), + Not(ContainSubstring(serverName)), + ), "Server should be stopped (exited) or removed from list") + }) + + It("should restart the streamable-http server successfully [Serial]", func() { + By("Restarting the server") + stdout, _ := e2e.NewTHVCommand(config, "restart", serverName).ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName)) + + By("Waiting for the server to be running again") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying streamable-http endpoint is accessible again") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(serverURL) + if err == nil { + resp.Body.Close() + } + // Connection attempt should not fail completely }) }) + }) - Describe("Error handling for SSE transport", func() { + Describe("Error handling for streamable-http transport", func() { Context("when providing invalid configuration", func() { var serverName string @@ -405,7 +405,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { // Check if the command succeeded or failed if err != nil { - // If it failed, that's expected for SSE-only servers + // If it failed, that's expected for streamable-http-only servers Expect(stderr).To(ContainSubstring("transport"), "Error should mention transport issue") } else { // If it succeeded, OSV supports both transports @@ -425,7 +425,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { By("Starting the first OSV MCP server") e2e.NewTHVCommand(config, "run", "--name", serverName, - "--transport", "sse", "osv").ExpectSuccess() + "--transport", "streamable-http", "osv").ExpectSuccess() // ensure it's actually up before attempting the duplicate err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) @@ -436,7 +436,7 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { // examine stdout/stderr stdout, stderr, runErr := e2e.NewTHVCommand(config, "run", "--name", serverName, - "--transport", "sse", + "--transport", "streamable-http", "osv").Run() // The second run must fail because the name already exists @@ -458,97 +458,97 @@ var _ = Describe("OsvMcpServer", Label("mcp", "sse", "e2e"), Serial, func() { Describe("Running OSV MCP server in the foreground", func() { Context("when running OSV server in foreground", func() { - It("starts, creates PID file, stays healthy, then stops & removes PID file [Serial]", func() { - serverName := generateUniqueServerName("osv-foreground-test") - - // 1) Start the foreground process in the background (goroutine) with a generous timeout. - done := make(chan struct{}) - fgStdout := "" - fgStderr := "" - go func() { - out, errOut, _ := e2e.NewTHVCommand( - config, "run", - "--name", serverName, - "--transport", "sse", - "--foreground", - "osv", - ).RunWithTimeout(5 * time.Minute) - fgStdout, fgStderr = out, errOut - close(done) - }() - - // Always try to stop the server at the end so the goroutine returns. - defer func() { - _, _, _ = e2e.NewTHVCommand(config, "stop", serverName).Run() - select { - case <-done: - case <-time.After(15 * time.Second): - // Nothing else we can signal directly; the RunWithTimeout will eventually kill it. - } - }() - - // 2) Wait until the server is reported as running. - By("waiting for foreground server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "server should reach running state") - - // 3) Verify workload is running via workload manager - By("verifying workload status is running via workload manager") - Eventually(func() runtime.WorkloadStatus { - ctx := context.Background() - manager, err := workloads.NewManager(ctx) - if err != nil { - return runtime.WorkloadStatusError - } - workload, err := manager.GetWorkload(ctx, serverName) - if err != nil { - return runtime.WorkloadStatusError - } - return workload.Status - }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusRunning), "workload should be in running status") - - // 5) Dwell 5 seconds, then confirm health/ready. - By("waiting 5 seconds and checking health") - time.Sleep(5 * time.Second) - - stdout, _ := e2e.NewTHVCommand(config, "list").ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "server should be listed") - Expect(stdout).To(ContainSubstring("running"), "server should be running") - - if serverURL, gerr := e2e.GetMCPServerURL(config, serverName); gerr == nil { - rerr := e2e.WaitForMCPServerReady(config, serverURL, "sse", 5*time.Minute) - Expect(rerr).ToNot(HaveOccurred(), "server should be protocol-ready") - } - - // 6) Stop the server; this should unblock the goroutine. - By("stopping the foreground server") - _, _ = e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() - - // Wait for the run goroutine to exit. + It("starts, creates PID file, stays healthy, then stops & removes PID file [Serial]", func() { + serverName := generateUniqueServerName("osv-foreground-test") + + // 1) Start the foreground process in the background (goroutine) with a generous timeout. + done := make(chan struct{}) + fgStdout := "" + fgStderr := "" + go func() { + out, errOut, _ := e2e.NewTHVCommand( + config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--foreground", + "osv", + ).RunWithTimeout(5 * time.Minute) + fgStdout, fgStderr = out, errOut + close(done) + }() + + // Always try to stop the server at the end so the goroutine returns. + defer func() { + _, _, _ = e2e.NewTHVCommand(config, "stop", serverName).Run() select { case <-done: - // ok case <-time.After(15 * time.Second): - Fail("foreground run did not exit after stop; stdout="+fgStdout+" stderr="+fgStderr, 1) + // Nothing else we can signal directly; the RunWithTimeout will eventually kill it. } + }() - // 7) Workload should be stopped via workload manager. - By("verifying workload status is stopped via workload manager") - Eventually(func() runtime.WorkloadStatus { - ctx := context.Background() - manager, err := workloads.NewManager(ctx) - if err != nil { - return runtime.WorkloadStatusError - } - workload, err := manager.GetWorkload(ctx, serverName) - if err != nil { - // If workload not found, it means it was properly cleaned up - return runtime.WorkloadStatusStopped - } - return workload.Status - }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusStopped), "workload should be in stopped status after stop") + // 2) Wait until the server is reported as running. + By("waiting for foreground server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "server should reach running state") - }) + // 3) Verify workload is running via workload manager + By("verifying workload status is running via workload manager") + Eventually(func() runtime.WorkloadStatus { + ctx := context.Background() + manager, err := workloads.NewManager(ctx) + if err != nil { + return runtime.WorkloadStatusError + } + workload, err := manager.GetWorkload(ctx, serverName) + if err != nil { + return runtime.WorkloadStatusError + } + return workload.Status + }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusRunning), "workload should be in running status") + + // 5) Dwell 5 seconds, then confirm health/ready. + By("waiting 5 seconds and checking health") + time.Sleep(5 * time.Second) + + stdout, _ := e2e.NewTHVCommand(config, "list").ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "server should be listed") + Expect(stdout).To(ContainSubstring("running"), "server should be running") + + if serverURL, gerr := e2e.GetMCPServerURL(config, serverName); gerr == nil { + rerr := e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(rerr).ToNot(HaveOccurred(), "server should be protocol-ready") + } + + // 6) Stop the server; this should unblock the goroutine. + By("stopping the foreground server") + _, _ = e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() + + // Wait for the run goroutine to exit. + select { + case <-done: + // ok + case <-time.After(15 * time.Second): + Fail("foreground run did not exit after stop; stdout="+fgStdout+" stderr="+fgStderr, 1) + } + + // 7) Workload should be stopped via workload manager. + By("verifying workload status is stopped via workload manager") + Eventually(func() runtime.WorkloadStatus { + ctx := context.Background() + manager, err := workloads.NewManager(ctx) + if err != nil { + return runtime.WorkloadStatusError + } + workload, err := manager.GetWorkload(ctx, serverName) + if err != nil { + // If workload not found, it means it was properly cleaned up + return runtime.WorkloadStatusStopped + } + return workload.Status + }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusStopped), "workload should be in stopped status after stop") + + }) }) }) From 59cb6570a185e5404f270e751a75343cc88b07dd Mon Sep 17 00:00:00 2001 From: nigel brown Date: Sat, 8 Nov 2025 07:18:46 +0000 Subject: [PATCH 09/10] fix(test): update OSV e2e tests to use streamable-http transport The OSV MCP server transport changed from sse to streamable-http in the registry update on 2025-11-04 (commit 31f2d8b2). This was causing test failures with 5-minute timeouts as the tests were still trying to start the server with --transport sse. Changes: - Updated all transport references from sse to streamable-http - Changed MCP client initialization from NewMCPClientForSSE to NewMCPClientForStreamableHTTP - Updated URL endpoint expectations from /sse to /mcp - Fixed formatting issues with golangci-lint Tests now pass: 13 Passed | 0 Failed --- test/e2e/osv_mcp_server_test.go | 714 ++++++++++++++++---------------- 1 file changed, 357 insertions(+), 357 deletions(-) diff --git a/test/e2e/osv_mcp_server_test.go b/test/e2e/osv_mcp_server_test.go index a3d574848..c84b691da 100644 --- a/test/e2e/osv_mcp_server_test.go +++ b/test/e2e/osv_mcp_server_test.go @@ -47,336 +47,336 @@ var _ = Describe("OsvMcpServer", Label("mcp", "streamable-http", "e2e"), Serial, } }) - It("should successfully start and be accessible via streamable-http [Serial]", func() { - By("Starting the OSV MCP server with streamable-http transport and audit enabled") - stdout, stderr := e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "streamable-http", - "--enable-audit", - "osv").ExpectSuccess() - - // The command should indicate success - Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") + It("should successfully start and be accessible via streamable-http [Serial]", func() { + By("Starting the OSV MCP server with streamable-http transport and audit enabled") + stdout, stderr := e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--enable-audit", + "osv").ExpectSuccess() - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") + // The command should indicate success + Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the OSV server") - By("Verifying the server appears in the list with streamable-http transport") - stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "Server should appear in the list") - Expect(stdout).To(ContainSubstring("running"), "Server should be in running state") - Expect(stdout).To(ContainSubstring("mcp"), "Server should show mcp endpoint") - }) + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "Server should be running within 5 minutes") - It("should be accessible via HTTP streamable-http endpoint [Serial]", func() { - By("Starting the OSV MCP server with audit enabled") - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "streamable-http", - "--enable-audit", - "osv").ExpectSuccess() - - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - - By("Getting the server URL") - serverURL, err := e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to get server URL") - Expect(serverURL).To(ContainSubstring("http"), "URL should be HTTP-based") - Expect(serverURL).To(ContainSubstring("/mcp"), "URL should contain MCP endpoint") + By("Verifying the server appears in the list with streamable-http transport") + stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "Server should appear in the list") + Expect(stdout).To(ContainSubstring("running"), "Server should be in running state") + Expect(stdout).To(ContainSubstring("mcp"), "Server should show mcp endpoint") + }) - By("Waiting before starting the HTTP request") - time.Sleep(10 * time.Second) + It("should be accessible via HTTP streamable-http endpoint [Serial]", func() { + By("Starting the OSV MCP server with audit enabled") + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--enable-audit", + "osv").ExpectSuccess() - By("Making an HTTP request to the streamable-http endpoint") + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - client := &http.Client{Timeout: 10 * time.Second} - var resp *http.Response - var httpErr error + By("Getting the server URL") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to get server URL") + Expect(serverURL).To(ContainSubstring("http"), "URL should be HTTP-based") + Expect(serverURL).To(ContainSubstring("/mcp"), "URL should contain MCP endpoint") - maxRetries := 5 - for i := 0; i < maxRetries; i++ { - req, err := http.NewRequest("GET", serverURL, nil) - Expect(err).ToNot(HaveOccurred()) - req.Header.Set("Accept", "text/event-stream") + By("Waiting before starting the HTTP request") + time.Sleep(10 * time.Second) - resp, httpErr = client.Do(req) - if httpErr == nil && resp.StatusCode >= 200 && resp.StatusCode < 500 { - break + By("Making an HTTP request to the streamable-http endpoint") + + client := &http.Client{Timeout: 10 * time.Second} + var resp *http.Response + var httpErr error + + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + req, err := http.NewRequest("GET", serverURL, nil) + Expect(err).ToNot(HaveOccurred()) + req.Header.Set("Accept", "text/event-stream") + + resp, httpErr = client.Do(req) + if httpErr == nil && resp.StatusCode >= 200 && resp.StatusCode < 500 { + break + } + if resp != nil { + resp.Body.Close() + } + time.Sleep(10 * time.Second) } - if resp != nil { - resp.Body.Close() - } - time.Sleep(10 * time.Second) - } - Expect(httpErr).ToNot(HaveOccurred(), "Should be able to connect to streamable-http endpoint") - Expect(resp).ToNot(BeNil(), "Response should not be nil") - defer resp.Body.Close() + Expect(httpErr).ToNot(HaveOccurred(), "Should be able to connect to streamable-http endpoint") + Expect(resp).ToNot(BeNil(), "Response should not be nil") + defer resp.Body.Close() - Expect(resp.StatusCode).To(BeNumerically(">=", 200), "Should get a valid HTTP response") - Expect(resp.StatusCode).To(BeNumerically("<", 500), "Should not get a server error") - }) + Expect(resp.StatusCode).To(BeNumerically(">=", 200), "Should get a valid HTTP response") + Expect(resp.StatusCode).To(BeNumerically("<", 500), "Should not get a server error") + }) - It("should respond to proper MCP protocol operations [Serial]", func() { - By("Starting the OSV MCP server") - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "streamable-http", - "osv").ExpectSuccess() + It("should respond to proper MCP protocol operations [Serial]", func() { + By("Starting the OSV MCP server") + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() - By("Waiting for the server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + By("Waiting for the server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - By("Getting the server URL") - serverURL, err := e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred()) + By("Getting the server URL") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) - By("Waiting for MCP server to be ready") - err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "MCP server should be ready for protocol operations") + By("Waiting for MCP server to be ready") + err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "MCP server should be ready for protocol operations") - By("Creating MCP client and initializing connection") - mcpClient, err := e2e.NewMCPClientForStreamableHTTP(config, serverURL) - Expect(err).ToNot(HaveOccurred(), "Should be able to create MCP client") - defer mcpClient.Close() + By("Creating MCP client and initializing connection") + mcpClient, err := e2e.NewMCPClientForStreamableHTTP(config, serverURL) + Expect(err).ToNot(HaveOccurred(), "Should be able to create MCP client") + defer mcpClient.Close() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - err = mcpClient.Initialize(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to initialize MCP connection") + err = mcpClient.Initialize(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to initialize MCP connection") - By("Testing basic MCP operations") - err = mcpClient.Ping(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to ping the server") + By("Testing basic MCP operations") + err = mcpClient.Ping(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to ping the server") - By("Listing available tools") - tools, err := mcpClient.ListTools(ctx) - Expect(err).ToNot(HaveOccurred(), "Should be able to list tools") - Expect(tools.Tools).ToNot(BeEmpty(), "OSV server should provide tools") + By("Listing available tools") + tools, err := mcpClient.ListTools(ctx) + Expect(err).ToNot(HaveOccurred(), "Should be able to list tools") + Expect(tools.Tools).ToNot(BeEmpty(), "OSV server should provide tools") - GinkgoWriter.Printf("Available tools: %d\n", len(tools.Tools)) - for _, tool := range tools.Tools { - GinkgoWriter.Printf(" - %s: %s\n", tool.Name, tool.Description) - } - }) - }) + GinkgoWriter.Printf("Available tools: %d\n", len(tools.Tools)) + for _, tool := range tools.Tools { + GinkgoWriter.Printf(" - %s: %s\n", tool.Name, tool.Description) + } + }) + }) - Context("when testing OSV-specific functionality", Ordered, func() { - var mcpClient *e2e.MCPClientHelper - var serverURL string - var cancel context.CancelFunc - var serverName string + Context("when testing OSV-specific functionality", Ordered, func() { + var mcpClient *e2e.MCPClientHelper + var serverURL string + var cancel context.CancelFunc + var serverName string - BeforeAll(func() { - // Generate unique server name for this context - serverName = generateUniqueServerName("osv-functionality-test") + BeforeAll(func() { + // Generate unique server name for this context + serverName = generateUniqueServerName("osv-functionality-test") - // Start ONE server for ALL OSV-specific tests - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "streamable-http", - "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + // Start ONE server for ALL OSV-specific tests + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - // Get server URL - serverURL, err = e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred()) + // Get server URL + serverURL, err = e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) - err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - }) + err = e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + }) - BeforeEach(func() { - // Create fresh MCP client for each test - var err error - mcpClient, err = e2e.NewMCPClientForStreamableHTTP(config, serverURL) - Expect(err).ToNot(HaveOccurred()) - - // Create context that will be cancelled in AfterEach - ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) - cancel = cancelFunc - err = mcpClient.Initialize(ctx) - Expect(err).ToNot(HaveOccurred()) - }) + BeforeEach(func() { + // Create fresh MCP client for each test + var err error + mcpClient, err = e2e.NewMCPClientForStreamableHTTP(config, serverURL) + Expect(err).ToNot(HaveOccurred()) - AfterEach(func() { - if cancel != nil { - cancel() - } - if mcpClient != nil { - mcpClient.Close() - } - }) + // Create context that will be cancelled in AfterEach + ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) + cancel = cancelFunc + err = mcpClient.Initialize(ctx) + Expect(err).ToNot(HaveOccurred()) + }) - AfterAll(func() { - if config.CleanupAfter { - // Clean up the shared server after all tests - err := e2e.StopAndRemoveMCPServer(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") - } - }) + AfterEach(func() { + if cancel != nil { + cancel() + } + if mcpClient != nil { + mcpClient.Close() + } + }) - It("should be listed in registry with OSV-specific information [Serial]", func() { - By("Getting OSV server info from registry") - stdout, _ := e2e.NewTHVCommand(config, "registry", "info", "osv").ExpectSuccess() - Expect(stdout).To(ContainSubstring("osv"), "Info should be about OSV server") - Expect(stdout).To(ContainSubstring("vulnerability"), "Info should mention vulnerability scanning") - }) + AfterAll(func() { + if config.CleanupAfter { + // Clean up the shared server after all tests + err := e2e.StopAndRemoveMCPServer(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") + } + }) - It("should provide vulnerability query tools [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + It("should be listed in registry with OSV-specific information [Serial]", func() { + By("Getting OSV server info from registry") + stdout, _ := e2e.NewTHVCommand(config, "registry", "info", "osv").ExpectSuccess() + Expect(stdout).To(ContainSubstring("osv"), "Info should be about OSV server") + Expect(stdout).To(ContainSubstring("vulnerability"), "Info should mention vulnerability scanning") + }) - By("Listing available tools") - mcpClient.ExpectToolExists(ctx, "query_vulnerability") + It("should provide vulnerability query tools [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - By("Testing vulnerability query with a known package") - // Test with a well-known package that should have vulnerabilities - arguments := map[string]interface{}{ - "package_name": "lodash", - "ecosystem": "npm", - "version": "4.17.15", // Known vulnerable version from OSV docs - } + By("Listing available tools") + mcpClient.ExpectToolExists(ctx, "query_vulnerability") - result := mcpClient.ExpectToolCall(ctx, "query_vulnerability", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability information") + By("Testing vulnerability query with a known package") + // Test with a well-known package that should have vulnerabilities + arguments := map[string]interface{}{ + "package_name": "lodash", + "ecosystem": "npm", + "version": "4.17.15", // Known vulnerable version from OSV docs + } - GinkgoWriter.Printf("Vulnerability query result: %+v\n", result.Content) - }) + result := mcpClient.ExpectToolCall(ctx, "query_vulnerability", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability information") - It("should handle batch vulnerability queries [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + GinkgoWriter.Printf("Vulnerability query result: %+v\n", result.Content) + }) - By("Testing batch vulnerability query") - mcpClient.ExpectToolExists(ctx, "query_vulnerabilities_batch") + It("should handle batch vulnerability queries [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + By("Testing batch vulnerability query") + mcpClient.ExpectToolExists(ctx, "query_vulnerabilities_batch") + + arguments := map[string]interface{}{ + "queries": []map[string]interface{}{ + { + "package_name": "lodash", + "ecosystem": "npm", + "version": "4.17.15", + }, + { + "package_name": "jinja2", + "ecosystem": "PyPI", + "version": "2.4.1", + }, + }, + } - arguments := map[string]interface{}{ - "queries": []map[string]interface{}{ - { - "package_name": "lodash", - "ecosystem": "npm", - "version": "4.17.15", - }, - { - "package_name": "jinja2", - "ecosystem": "PyPI", - "version": "2.4.1", - }, - }, - } - - result := mcpClient.ExpectToolCall(ctx, "query_vulnerabilities_batch", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return batch vulnerability information") - - GinkgoWriter.Printf("Batch vulnerability query result: %+v\n", result.Content) - }) + result := mcpClient.ExpectToolCall(ctx, "query_vulnerabilities_batch", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return batch vulnerability information") - It("should get vulnerability details by ID [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + GinkgoWriter.Printf("Batch vulnerability query result: %+v\n", result.Content) + }) - By("Testing get vulnerability by ID") - mcpClient.ExpectToolExists(ctx, "get_vulnerability") + It("should get vulnerability details by ID [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - arguments := map[string]interface{}{ - "id": "GHSA-vqj2-4v8m-8vrq", // Example from OSV docs - } + By("Testing get vulnerability by ID") + mcpClient.ExpectToolExists(ctx, "get_vulnerability") - result := mcpClient.ExpectToolCall(ctx, "get_vulnerability", arguments) - Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability details") + arguments := map[string]interface{}{ + "id": "GHSA-vqj2-4v8m-8vrq", // Example from OSV docs + } - GinkgoWriter.Printf("Vulnerability details result: %+v\n", result.Content) - }) + result := mcpClient.ExpectToolCall(ctx, "get_vulnerability", arguments) + Expect(result.Content).ToNot(BeEmpty(), "Should return vulnerability details") - It("should handle invalid vulnerability queries gracefully [Serial]", func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + GinkgoWriter.Printf("Vulnerability details result: %+v\n", result.Content) + }) - By("Testing with invalid package information") - arguments := map[string]interface{}{ - "package_name": "non-existent-package-12345", - "ecosystem": "npm", - "version": "1.0.0", - } + It("should handle invalid vulnerability queries gracefully [Serial]", func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - // This should not fail, but should return empty results - result, err := mcpClient.CallTool(ctx, "query_vulnerability", arguments) - Expect(err).ToNot(HaveOccurred(), "Should handle invalid queries gracefully") - Expect(result).ToNot(BeNil(), "Should return a result even for non-existent packages") + By("Testing with invalid package information") + arguments := map[string]interface{}{ + "package_name": "non-existent-package-12345", + "ecosystem": "npm", + "version": "1.0.0", + } - GinkgoWriter.Printf("Invalid query result: %+v\n", result.Content) - }) - }) + // This should not fail, but should return empty results + result, err := mcpClient.CallTool(ctx, "query_vulnerability", arguments) + Expect(err).ToNot(HaveOccurred(), "Should handle invalid queries gracefully") + Expect(result).ToNot(BeNil(), "Should return a result even for non-existent packages") - Context("when managing server lifecycle", func() { - var serverName string + GinkgoWriter.Printf("Invalid query result: %+v\n", result.Content) + }) + }) - BeforeEach(func() { - // Generate unique server name for each lifecycle test - serverName = generateUniqueServerName("osv-lifecycle-test") + Context("when managing server lifecycle", func() { + var serverName string - // Start a server for lifecycle tests - e2e.NewTHVCommand(config, "run", - "--name", serverName, - "--transport", "streamable-http", - "osv").ExpectSuccess() - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - }) + BeforeEach(func() { + // Generate unique server name for each lifecycle test + serverName = generateUniqueServerName("osv-lifecycle-test") - AfterEach(func() { - if config.CleanupAfter { - // Clean up the server after each lifecycle test - err := e2e.StopAndRemoveMCPServer(config, serverName) - Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") - } - }) + // Start a server for lifecycle tests + e2e.NewTHVCommand(config, "run", + "--name", serverName, + "--transport", "streamable-http", + "osv").ExpectSuccess() + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + }) - It("should stop the streamable-http server successfully [Serial]", func() { - By("Stopping the server") - stdout, _ := e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "Output should mention the server name") - - By("Verifying the server is stopped") - Eventually(func() string { - stdout, _ := e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess() - return stdout - }, 10*time.Second, 1*time.Second).Should(Or( - // Server should either be in exited state or completely removed - And(ContainSubstring(serverName), ContainSubstring("stopped")), - Not(ContainSubstring(serverName)), - ), "Server should be stopped (exited) or removed from list") - }) + AfterEach(func() { + if config.CleanupAfter { + // Clean up the server after each lifecycle test + err := e2e.StopAndRemoveMCPServer(config, serverName) + Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server") + } + }) - It("should restart the streamable-http server successfully [Serial]", func() { - By("Restarting the server") - stdout, _ := e2e.NewTHVCommand(config, "restart", serverName).ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName)) + It("should stop the streamable-http server successfully [Serial]", func() { + By("Stopping the server") + stdout, _ := e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "Output should mention the server name") + + By("Verifying the server is stopped") + Eventually(func() string { + stdout, _ := e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess() + return stdout + }, 10*time.Second, 1*time.Second).Should(Or( + // Server should either be in exited state or completely removed + And(ContainSubstring(serverName), ContainSubstring("stopped")), + Not(ContainSubstring(serverName)), + ), "Server should be stopped (exited) or removed from list") + }) - By("Waiting for the server to be running again") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) + It("should restart the streamable-http server successfully [Serial]", func() { + By("Restarting the server") + stdout, _ := e2e.NewTHVCommand(config, "restart", serverName).ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName)) - By("Verifying streamable-http endpoint is accessible again") - serverURL, err := e2e.GetMCPServerURL(config, serverName) - Expect(err).ToNot(HaveOccurred()) + By("Waiting for the server to be running again") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) - client := &http.Client{Timeout: 5 * time.Second} - resp, err := client.Get(serverURL) - if err == nil { - resp.Body.Close() - } - // Connection attempt should not fail completely + By("Verifying streamable-http endpoint is accessible again") + serverURL, err := e2e.GetMCPServerURL(config, serverName) + Expect(err).ToNot(HaveOccurred()) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(serverURL) + if err == nil { + resp.Body.Close() + } + // Connection attempt should not fail completely + }) }) }) - }) Describe("Error handling for streamable-http transport", func() { Context("when providing invalid configuration", func() { @@ -458,97 +458,97 @@ var _ = Describe("OsvMcpServer", Label("mcp", "streamable-http", "e2e"), Serial, Describe("Running OSV MCP server in the foreground", func() { Context("when running OSV server in foreground", func() { - It("starts, creates PID file, stays healthy, then stops & removes PID file [Serial]", func() { - serverName := generateUniqueServerName("osv-foreground-test") - - // 1) Start the foreground process in the background (goroutine) with a generous timeout. - done := make(chan struct{}) - fgStdout := "" - fgStderr := "" - go func() { - out, errOut, _ := e2e.NewTHVCommand( - config, "run", - "--name", serverName, - "--transport", "streamable-http", - "--foreground", - "osv", - ).RunWithTimeout(5 * time.Minute) - fgStdout, fgStderr = out, errOut - close(done) - }() - - // Always try to stop the server at the end so the goroutine returns. - defer func() { - _, _, _ = e2e.NewTHVCommand(config, "stop", serverName).Run() - select { - case <-done: - case <-time.After(15 * time.Second): - // Nothing else we can signal directly; the RunWithTimeout will eventually kill it. + It("starts, creates PID file, stays healthy, then stops & removes PID file [Serial]", func() { + serverName := generateUniqueServerName("osv-foreground-test") + + // 1) Start the foreground process in the background (goroutine) with a generous timeout. + done := make(chan struct{}) + fgStdout := "" + fgStderr := "" + go func() { + out, errOut, _ := e2e.NewTHVCommand( + config, "run", + "--name", serverName, + "--transport", "streamable-http", + "--foreground", + "osv", + ).RunWithTimeout(5 * time.Minute) + fgStdout, fgStderr = out, errOut + close(done) + }() + + // Always try to stop the server at the end so the goroutine returns. + defer func() { + _, _, _ = e2e.NewTHVCommand(config, "stop", serverName).Run() + select { + case <-done: + case <-time.After(15 * time.Second): + // Nothing else we can signal directly; the RunWithTimeout will eventually kill it. + } + }() + + // 2) Wait until the server is reported as running. + By("waiting for foreground server to be running") + err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) + Expect(err).ToNot(HaveOccurred(), "server should reach running state") + + // 3) Verify workload is running via workload manager + By("verifying workload status is running via workload manager") + Eventually(func() runtime.WorkloadStatus { + ctx := context.Background() + manager, err := workloads.NewManager(ctx) + if err != nil { + return runtime.WorkloadStatusError + } + workload, err := manager.GetWorkload(ctx, serverName) + if err != nil { + return runtime.WorkloadStatusError + } + return workload.Status + }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusRunning), "workload should be in running status") + + // 5) Dwell 5 seconds, then confirm health/ready. + By("waiting 5 seconds and checking health") + time.Sleep(5 * time.Second) + + stdout, _ := e2e.NewTHVCommand(config, "list").ExpectSuccess() + Expect(stdout).To(ContainSubstring(serverName), "server should be listed") + Expect(stdout).To(ContainSubstring("running"), "server should be running") + + if serverURL, gerr := e2e.GetMCPServerURL(config, serverName); gerr == nil { + rerr := e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) + Expect(rerr).ToNot(HaveOccurred(), "server should be protocol-ready") } - }() - // 2) Wait until the server is reported as running. - By("waiting for foreground server to be running") - err := e2e.WaitForMCPServer(config, serverName, 5*time.Minute) - Expect(err).ToNot(HaveOccurred(), "server should reach running state") + // 6) Stop the server; this should unblock the goroutine. + By("stopping the foreground server") + _, _ = e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() - // 3) Verify workload is running via workload manager - By("verifying workload status is running via workload manager") - Eventually(func() runtime.WorkloadStatus { - ctx := context.Background() - manager, err := workloads.NewManager(ctx) - if err != nil { - return runtime.WorkloadStatusError - } - workload, err := manager.GetWorkload(ctx, serverName) - if err != nil { - return runtime.WorkloadStatusError + // Wait for the run goroutine to exit. + select { + case <-done: + // ok + case <-time.After(15 * time.Second): + Fail("foreground run did not exit after stop; stdout="+fgStdout+" stderr="+fgStderr, 1) } - return workload.Status - }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusRunning), "workload should be in running status") - - // 5) Dwell 5 seconds, then confirm health/ready. - By("waiting 5 seconds and checking health") - time.Sleep(5 * time.Second) - - stdout, _ := e2e.NewTHVCommand(config, "list").ExpectSuccess() - Expect(stdout).To(ContainSubstring(serverName), "server should be listed") - Expect(stdout).To(ContainSubstring("running"), "server should be running") - - if serverURL, gerr := e2e.GetMCPServerURL(config, serverName); gerr == nil { - rerr := e2e.WaitForMCPServerReady(config, serverURL, "streamable-http", 5*time.Minute) - Expect(rerr).ToNot(HaveOccurred(), "server should be protocol-ready") - } - // 6) Stop the server; this should unblock the goroutine. - By("stopping the foreground server") - _, _ = e2e.NewTHVCommand(config, "stop", serverName).ExpectSuccess() + // 7) Workload should be stopped via workload manager. + By("verifying workload status is stopped via workload manager") + Eventually(func() runtime.WorkloadStatus { + ctx := context.Background() + manager, err := workloads.NewManager(ctx) + if err != nil { + return runtime.WorkloadStatusError + } + workload, err := manager.GetWorkload(ctx, serverName) + if err != nil { + // If workload not found, it means it was properly cleaned up + return runtime.WorkloadStatusStopped + } + return workload.Status + }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusStopped), "workload should be in stopped status after stop") - // Wait for the run goroutine to exit. - select { - case <-done: - // ok - case <-time.After(15 * time.Second): - Fail("foreground run did not exit after stop; stdout="+fgStdout+" stderr="+fgStderr, 1) - } - - // 7) Workload should be stopped via workload manager. - By("verifying workload status is stopped via workload manager") - Eventually(func() runtime.WorkloadStatus { - ctx := context.Background() - manager, err := workloads.NewManager(ctx) - if err != nil { - return runtime.WorkloadStatusError - } - workload, err := manager.GetWorkload(ctx, serverName) - if err != nil { - // If workload not found, it means it was properly cleaned up - return runtime.WorkloadStatusStopped - } - return workload.Status - }, 15*time.Second, 200*time.Millisecond).Should(Equal(runtime.WorkloadStatusStopped), "workload should be in stopped status after stop") - - }) + }) }) }) From 1a1e44bedb6d9e6a122d9651a3bebada194862c5 Mon Sep 17 00:00:00 2001 From: nigel brown Date: Mon, 17 Nov 2025 17:22:56 +0000 Subject: [PATCH 10/10] Skip the stdio servers Signed-off-by: nigel brown --- pkg/runner/runner.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 2345eb640..54fc3fcd4 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -277,6 +277,8 @@ func (r *Runner) Run(ctx context.Context) error { // Wait for the MCP server to accept initialize requests before updating client configurations. // This prevents timing issues where clients try to connect before the server is fully ready. // We repeatedly call initialize until it succeeds (up to 5 minutes). + // Note: We skip this check for pure STDIO transport because STDIO servers may reject + // multiple initialize calls (see #1982). transportType := labels.GetTransportType(r.Config.ContainerLabels) serverURL := transport.GenerateMCPServerURL( transportType, @@ -285,11 +287,18 @@ func (r *Runner) Run(ctx context.Context) error { r.Config.ContainerName, r.Config.RemoteURL) - // Repeatedly try calling initialize until it succeeds (up to 5 minutes) - // Some servers (like mcp-optimizer) can take significant time to start up - if err := waitForInitializeSuccess(ctx, serverURL, transportType, 5*time.Minute); err != nil { - logger.Warnf("Warning: Initialize not successful, but continuing: %v", err) - // Continue anyway to maintain backward compatibility, but log a warning + // Only wait for initialization on non-STDIO transports + // STDIO servers communicate directly via stdin/stdout and calling initialize multiple times + // can cause issues as the behavior is not specified by the MCP spec + if transportType != "stdio" { + // Repeatedly try calling initialize until it succeeds (up to 5 minutes) + // Some servers (like mcp-optimizer) can take significant time to start up + if err := waitForInitializeSuccess(ctx, serverURL, transportType, 5*time.Minute); err != nil { + logger.Warnf("Warning: Initialize not successful, but continuing: %v", err) + // Continue anyway to maintain backward compatibility, but log a warning + } + } else { + logger.Debugf("Skipping initialize check for STDIO transport") } // Update client configurations with the MCP server URL. @@ -464,6 +473,7 @@ func (r *Runner) Cleanup(ctx context.Context) error { // waitForInitializeSuccess repeatedly checks if the MCP server is ready to accept requests. // This prevents timing issues where clients try to connect before the server is fully ready. // It makes repeated attempts with exponential backoff up to a maximum timeout. +// Note: This function should not be called for STDIO transport. func waitForInitializeSuccess(ctx context.Context, serverURL, transportType string, maxWaitTime time.Duration) error { // Determine the endpoint and method to use based on transport type var endpoint string @@ -479,8 +489,8 @@ func waitForInitializeSuccess(ctx context.Context, serverURL, transportType stri payload = `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` + `"params":{"protocolVersion":"2024-11-05","capabilities":{},` + `"clientInfo":{"name":"toolhive","version":"1.0"}}}` - case "sse", "stdio": - // For SSE/stdio, just check if the SSE endpoint is available + case "sse": + // For SSE, just check if the SSE endpoint is available // We can't easily call initialize without establishing a full SSE connection, // so we just verify the endpoint responds. // Format: http://localhost:port/sse#container-name -> http://localhost:port/sse