|
2 | 2 | package runner |
3 | 3 |
|
4 | 4 | import ( |
| 5 | + "bytes" |
5 | 6 | "context" |
6 | 7 | "fmt" |
7 | 8 | "net/http" |
8 | 9 | "os" |
9 | 10 | "os/signal" |
| 11 | + "strings" |
10 | 12 | "syscall" |
11 | 13 | "time" |
12 | 14 |
|
@@ -272,21 +274,40 @@ func (r *Runner) Run(ctx context.Context) error { |
272 | 274 |
|
273 | 275 | logger.Infof("MCP server %s started successfully", r.Config.ContainerName) |
274 | 276 |
|
| 277 | + // Wait for the MCP server to accept initialize requests before updating client configurations. |
| 278 | + // This prevents timing issues where clients try to connect before the server is fully ready. |
| 279 | + // We repeatedly call initialize until it succeeds (up to 5 minutes). |
| 280 | + // Note: We skip this check for pure STDIO transport because STDIO servers may reject |
| 281 | + // multiple initialize calls (see #1982). |
| 282 | + transportType := labels.GetTransportType(r.Config.ContainerLabels) |
| 283 | + serverURL := transport.GenerateMCPServerURL( |
| 284 | + transportType, |
| 285 | + "localhost", |
| 286 | + r.Config.Port, |
| 287 | + r.Config.ContainerName, |
| 288 | + r.Config.RemoteURL) |
| 289 | + |
| 290 | + // Only wait for initialization on non-STDIO transports |
| 291 | + // STDIO servers communicate directly via stdin/stdout and calling initialize multiple times |
| 292 | + // can cause issues as the behavior is not specified by the MCP spec |
| 293 | + if transportType != "stdio" { |
| 294 | + // Repeatedly try calling initialize until it succeeds (up to 5 minutes) |
| 295 | + // Some servers (like mcp-optimizer) can take significant time to start up |
| 296 | + if err := waitForInitializeSuccess(ctx, serverURL, transportType, 5*time.Minute); err != nil { |
| 297 | + logger.Warnf("Warning: Initialize not successful, but continuing: %v", err) |
| 298 | + // Continue anyway to maintain backward compatibility, but log a warning |
| 299 | + } |
| 300 | + } else { |
| 301 | + logger.Debugf("Skipping initialize check for STDIO transport") |
| 302 | + } |
| 303 | + |
275 | 304 | // Update client configurations with the MCP server URL. |
276 | 305 | // Note that this function checks the configuration to determine which |
277 | 306 | // clients should be updated, if any. |
278 | 307 | clientManager, err := client.NewManager(ctx) |
279 | 308 | if err != nil { |
280 | 309 | logger.Warnf("Warning: Failed to create client manager: %v", err) |
281 | 310 | } else { |
282 | | - transportType := labels.GetTransportType(r.Config.ContainerLabels) |
283 | | - serverURL := transport.GenerateMCPServerURL( |
284 | | - transportType, |
285 | | - "localhost", |
286 | | - r.Config.Port, |
287 | | - r.Config.ContainerName, |
288 | | - r.Config.RemoteURL) |
289 | | - |
290 | 311 | if err := clientManager.AddServerToClients(ctx, r.Config.ContainerName, serverURL, transportType, r.Config.Group); err != nil { |
291 | 312 | logger.Warnf("Warning: Failed to add server to client configurations: %v", err) |
292 | 313 | } |
@@ -448,3 +469,115 @@ func (r *Runner) Cleanup(ctx context.Context) error { |
448 | 469 |
|
449 | 470 | return lastErr |
450 | 471 | } |
| 472 | + |
| 473 | +// waitForInitializeSuccess repeatedly checks if the MCP server is ready to accept requests. |
| 474 | +// This prevents timing issues where clients try to connect before the server is fully ready. |
| 475 | +// It makes repeated attempts with exponential backoff up to a maximum timeout. |
| 476 | +// Note: This function should not be called for STDIO transport. |
| 477 | +func waitForInitializeSuccess(ctx context.Context, serverURL, transportType string, maxWaitTime time.Duration) error { |
| 478 | + // Determine the endpoint and method to use based on transport type |
| 479 | + var endpoint string |
| 480 | + var method string |
| 481 | + var payload string |
| 482 | + |
| 483 | + switch transportType { |
| 484 | + case "streamable-http", "streamable": |
| 485 | + // For streamable-http, send initialize request to /mcp endpoint |
| 486 | + // Format: http://localhost:port/mcp |
| 487 | + endpoint = serverURL |
| 488 | + method = "POST" |
| 489 | + payload = `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` + |
| 490 | + `"params":{"protocolVersion":"2024-11-05","capabilities":{},` + |
| 491 | + `"clientInfo":{"name":"toolhive","version":"1.0"}}}` |
| 492 | + case "sse": |
| 493 | + // For SSE, just check if the SSE endpoint is available |
| 494 | + // We can't easily call initialize without establishing a full SSE connection, |
| 495 | + // so we just verify the endpoint responds. |
| 496 | + // Format: http://localhost:port/sse#container-name -> http://localhost:port/sse |
| 497 | + endpoint = serverURL |
| 498 | + // Remove fragment if present (everything after #) |
| 499 | + if idx := strings.Index(endpoint, "#"); idx != -1 { |
| 500 | + endpoint = endpoint[:idx] |
| 501 | + } |
| 502 | + method = "GET" |
| 503 | + payload = "" |
| 504 | + default: |
| 505 | + // For other transports, no HTTP check is needed |
| 506 | + logger.Debugf("Skipping readiness check for transport type: %s", transportType) |
| 507 | + return nil |
| 508 | + } |
| 509 | + |
| 510 | + // Setup retry logic with exponential backoff |
| 511 | + startTime := time.Now() |
| 512 | + attempt := 0 |
| 513 | + delay := 100 * time.Millisecond |
| 514 | + maxDelay := 2 * time.Second // Cap at 2 seconds between retries |
| 515 | + |
| 516 | + logger.Infof("Waiting for MCP server to be ready at %s (timeout: %v)", endpoint, maxWaitTime) |
| 517 | + |
| 518 | + // Create HTTP client with a reasonable timeout for requests |
| 519 | + httpClient := &http.Client{ |
| 520 | + Timeout: 10 * time.Second, |
| 521 | + } |
| 522 | + |
| 523 | + for { |
| 524 | + attempt++ |
| 525 | + |
| 526 | + // Make the readiness check request |
| 527 | + var req *http.Request |
| 528 | + var err error |
| 529 | + if payload != "" { |
| 530 | + req, err = http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBufferString(payload)) |
| 531 | + } else { |
| 532 | + req, err = http.NewRequestWithContext(ctx, method, endpoint, nil) |
| 533 | + } |
| 534 | + |
| 535 | + if err != nil { |
| 536 | + logger.Debugf("Failed to create request (attempt %d): %v", attempt, err) |
| 537 | + } else { |
| 538 | + if method == "POST" { |
| 539 | + req.Header.Set("Content-Type", "application/json") |
| 540 | + req.Header.Set("Accept", "application/json, text/event-stream") |
| 541 | + req.Header.Set("MCP-Protocol-Version", "2024-11-05") |
| 542 | + } |
| 543 | + |
| 544 | + resp, err := httpClient.Do(req) |
| 545 | + if err == nil { |
| 546 | + //nolint:errcheck // Ignoring close error on response body in error path |
| 547 | + defer resp.Body.Close() |
| 548 | + |
| 549 | + // For GET (SSE), accept 200 OK |
| 550 | + // For POST (streamable-http), also accept 200 OK |
| 551 | + if resp.StatusCode == http.StatusOK { |
| 552 | + elapsed := time.Since(startTime) |
| 553 | + logger.Infof("MCP server is ready after %v (attempt %d)", elapsed, attempt) |
| 554 | + return nil |
| 555 | + } |
| 556 | + |
| 557 | + logger.Debugf("Server returned status %d (attempt %d)", resp.StatusCode, attempt) |
| 558 | + } else { |
| 559 | + logger.Debugf("Failed to reach endpoint (attempt %d): %v", attempt, err) |
| 560 | + } |
| 561 | + } |
| 562 | + |
| 563 | + // Check if we've exceeded the maximum wait time |
| 564 | + elapsed := time.Since(startTime) |
| 565 | + if elapsed >= maxWaitTime { |
| 566 | + return fmt.Errorf("initialize not successful after %v (%d attempts)", elapsed, attempt) |
| 567 | + } |
| 568 | + |
| 569 | + // Wait before retrying |
| 570 | + select { |
| 571 | + case <-ctx.Done(): |
| 572 | + return fmt.Errorf("context cancelled while waiting for initialize") |
| 573 | + case <-time.After(delay): |
| 574 | + // Continue to next attempt |
| 575 | + } |
| 576 | + |
| 577 | + // Update delay for next iteration with exponential backoff |
| 578 | + delay *= 2 |
| 579 | + if delay > maxDelay { |
| 580 | + delay = maxDelay |
| 581 | + } |
| 582 | + } |
| 583 | +} |
0 commit comments