diff --git a/internal/upstream/managed/client.go b/internal/upstream/managed/client.go index e4b92461..d4c65db5 100644 --- a/internal/upstream/managed/client.go +++ b/internal/upstream/managed/client.go @@ -64,8 +64,22 @@ type Client struct { // Tool discovery callback for notifications/tools/list_changed handling toolDiscoveryCallback func(ctx context.Context, serverName string) error + + // consecutiveHealthFailures counts back-to-back transient health-check + // failures. The state-machine only flips to Error once it reaches + // healthCheckFailureThreshold; one success resets it. Hard failures + // (connection refused, no such host, unreachable) bypass the counter + // and trigger Error immediately. See recordHealthCheckFailure(). + consecutiveHealthFailures int } +// healthCheckFailureThreshold is the number of consecutive transient +// health-check failures we tolerate before marking the server Error. +// With a 30-second tick this is ~90s of unreachability — long enough that a +// real outage still surfaces promptly, short enough that one slow upstream +// request doesn't paint the UI red and clear the tools list. +const healthCheckFailureThreshold = 3 + // NewClient creates a new managed client with state management func NewClient(id string, serverConfig *config.ServerConfig, logger *zap.Logger, logConfig *config.LogConfig, globalConfig *config.Config, storage *storage.BoltDB, secretResolver *secret.Resolver) (*Client, error) { // Create core client @@ -229,6 +243,11 @@ func (mc *Client) Connect(ctx context.Context) error { mc.StateManager.TransitionTo(types.StateReady) } + // Wipe any consecutive-failure debt accumulated before reconnect so the + // new session starts at zero. Without this, a server that flapped, then + // recovered, would carry stale counts into the next health-check window. + mc.resetHealthCheckFailures() + // Update state manager with server info if serverInfo := mc.coreClient.GetServerInfo(); serverInfo != nil { mc.StateManager.SetServerInfo(serverInfo.ServerInfo.Name, serverInfo.ServerInfo.Version) @@ -831,10 +850,19 @@ func (mc *Client) performHealthCheck() { if err != nil { // Only mark as error if it's a real connection issue, not timeout during high activity if mc.isConnectionError(err) { - mc.logger.Warn("Health check failed with connection error, marking as error", - zap.String("server", mc.Config.Name), - zap.Error(err)) - mc.StateManager.SetError(err) + if mc.recordHealthCheckFailure(err) { + mc.logger.Warn("Health check failed repeatedly, marking as error", + zap.String("server", mc.Config.Name), + zap.Int("consecutive_failures", mc.consecutiveHealthFailures), + zap.Error(err)) + mc.StateManager.SetError(err) + } else { + mc.logger.Info("Health check failed transiently, tolerating below threshold", + zap.String("server", mc.Config.Name), + zap.Int("consecutive_failures", mc.consecutiveHealthFailures), + zap.Int("threshold", healthCheckFailureThreshold), + zap.Error(err)) + } } else { mc.logger.Debug("Health check failed with timeout (high activity), ignoring", zap.String("server", mc.Config.Name), @@ -843,10 +871,74 @@ func (mc *Client) performHealthCheck() { return } + mc.recordHealthCheckSuccess() mc.logger.Debug("Health check passed successfully", zap.String("server", mc.Config.Name)) } +// recordHealthCheckFailure increments the consecutive-failure counter and +// returns whether the caller should now flip the state machine to Error. +// +// Transient errors (timeout, deadline exceeded, context canceled) need +// healthCheckFailureThreshold consecutive misses before they're considered a +// real outage — slow upstreams (e.g. hf.co/mcp under load) routinely miss a +// single 5-second health-check window without actually being down. Hard +// failures (connection refused, host unreachable, DNS gone) trigger Error +// immediately because waiting buys nothing — the server is genuinely +// unreachable and the user should see that. +func (mc *Client) recordHealthCheckFailure(err error) bool { + mc.consecutiveHealthFailures++ + if !isTransientHealthCheckError(err) { + return true + } + return mc.consecutiveHealthFailures >= healthCheckFailureThreshold +} + +// recordHealthCheckSuccess resets the consecutive-failure counter. One good +// check is enough to wipe the slate — we're not trying to track flap +// frequency, just preventing single misses from looking like outages. +func (mc *Client) recordHealthCheckSuccess() { + mc.consecutiveHealthFailures = 0 +} + +// resetHealthCheckFailures clears the counter. Called from the connect +// success path so a successful reconnect doesn't carry stale failure debt +// from before the disconnect. +func (mc *Client) resetHealthCheckFailures() { + mc.consecutiveHealthFailures = 0 +} + +// isTransientHealthCheckError identifies failure modes that warrant +// flap-resistance — slow upstream / momentary timeout — vs. hard failures +// that should surface to the user immediately. +func isTransientHealthCheckError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + // Hard failures: short-circuit to "not transient" so the caller flips + // Error on the first miss. Order matters — check these BEFORE the + // generic timeout heuristics below. + switch { + case strings.Contains(msg, "connection refused"), + strings.Contains(msg, "no such host"), + strings.Contains(msg, "network is unreachable"), + strings.Contains(msg, "no route to host"), + strings.Contains(msg, "connection reset"), + strings.Contains(msg, "broken pipe"), + strings.Contains(msg, "econnrefused"): + return false + } + // Soft failures: short-window misses we want to tolerate. + switch { + case strings.Contains(msg, "deadline exceeded"), + strings.Contains(msg, "timeout"), + strings.Contains(msg, "context canceled"): + return true + } + return false +} + // RefreshOAuthTokenDirect forces an OAuth token refresh without reconnecting. // This delegates to the core client's direct refresh implementation. // Used by the RefreshManager for proactive token refresh before expiration. diff --git a/internal/upstream/managed/health_flap_test.go b/internal/upstream/managed/health_flap_test.go new file mode 100644 index 00000000..b2b9dcd2 --- /dev/null +++ b/internal/upstream/managed/health_flap_test.go @@ -0,0 +1,126 @@ +package managed + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/types" +) + +func newTestClientForHealth(t *testing.T) *Client { + t.Helper() + mc := &Client{ + Config: &config.ServerConfig{Name: "flap-server"}, + logger: zap.NewNop(), + } + mc.StateManager = types.NewStateManager() + mc.StateManager.TransitionTo(types.StateConnecting) + mc.StateManager.TransitionTo(types.StateReady) + return mc +} + +// TestHealthCheck_TransientTimeoutToleratedBelowThreshold verifies that a +// single transient timeout (the slow-upstream case that caused +// MCPX_UNKNOWN_UNCLASSIFIED to surface in the UI on every disable-tool toggle) +// does NOT flip the server to Error state. The state must remain Ready until +// healthCheckFailureThreshold consecutive transient failures have accumulated. +func TestHealthCheck_TransientTimeoutToleratedBelowThreshold(t *testing.T) { + mc := newTestClientForHealth(t) + timeoutErr := errors.New(`failed to list tools: transport error: failed to send request: failed to send request: Post "https://hf.co/mcp": context deadline exceeded`) + + // First (threshold-1) failures must be tolerated — counter increments + // but state stays Ready. + for i := 1; i < healthCheckFailureThreshold; i++ { + shouldError := mc.recordHealthCheckFailure(timeoutErr) + assert.False(t, shouldError, "transient failure #%d should be tolerated", i) + assert.Equal(t, types.StateReady, mc.StateManager.GetState(), + "state should stay Ready after transient failure #%d", i) + } + + // The Nth consecutive failure tips us over. + shouldError := mc.recordHealthCheckFailure(timeoutErr) + assert.True(t, shouldError, "Nth transient failure should trigger Error transition") +} + +// TestHealthCheck_HardErrorTriggersImmediateError verifies that hard +// connection failures (connection refused, host unreachable) bypass the +// flap-resistance threshold — the server is genuinely down and the user +// should see that immediately, not wait 90 seconds. +func TestHealthCheck_HardErrorTriggersImmediateError(t *testing.T) { + mc := newTestClientForHealth(t) + hardErr := errors.New("dial tcp 127.0.0.1:65535: connect: connection refused") + + shouldError := mc.recordHealthCheckFailure(hardErr) + assert.True(t, shouldError, "hard error must trigger Error on first occurrence") +} + +// TestHealthCheck_SuccessResetsCounter verifies that after a transient +// failure, a successful health check resets the consecutive-failure counter +// — so two failures spaced by a recovery don't add up to the threshold. +func TestHealthCheck_SuccessResetsCounter(t *testing.T) { + mc := newTestClientForHealth(t) + timeoutErr := errors.New("transport error: context deadline exceeded") + + // Accumulate threshold-1 failures. + for i := 1; i < healthCheckFailureThreshold; i++ { + mc.recordHealthCheckFailure(timeoutErr) + } + require.Equal(t, healthCheckFailureThreshold-1, mc.consecutiveHealthFailures) + + // One success wipes the slate. + mc.recordHealthCheckSuccess() + assert.Equal(t, 0, mc.consecutiveHealthFailures) + + // Now the next transient failure must be back to "tolerated". + shouldError := mc.recordHealthCheckFailure(timeoutErr) + assert.False(t, shouldError, "first failure after recovery must be tolerated again") +} + +// TestIsTransientHealthCheckError covers the categorisation that gates the +// flap-resistance behavior. +func TestIsTransientHealthCheckError(t *testing.T) { + cases := []struct { + name string + err error + transient bool + }{ + {"context deadline exceeded", errors.New("Post: context deadline exceeded"), true}, + {"explicit timeout word", errors.New("net/http: request timeout"), true}, + {"context canceled", errors.New("operation: context canceled"), true}, + {"connection refused", errors.New("dial: connection refused"), false}, + {"no such host", errors.New("dial tcp: lookup nope.invalid: no such host"), false}, + {"network unreachable", errors.New("network is unreachable"), false}, + {"connection reset", errors.New("read: connection reset by peer"), false}, + {"nil error", nil, false}, + } + for _, tc := range cases { + got := isTransientHealthCheckError(tc.err) + if got != tc.transient { + t.Errorf("%s: isTransientHealthCheckError = %v, want %v", tc.name, got, tc.transient) + } + } +} + +// TestHealthCheck_ResetOnConnect verifies that a fresh Connect() clears the +// consecutive-failure counter even if a previous session accumulated some. +// Important so reconnect cycles don't carry stale failure debt. +func TestHealthCheck_ResetOnConnect(t *testing.T) { + mc := newTestClientForHealth(t) + timeoutErr := errors.New("context deadline exceeded") + + mc.recordHealthCheckFailure(timeoutErr) + mc.recordHealthCheckFailure(timeoutErr) + require.Equal(t, 2, mc.consecutiveHealthFailures) + + mc.resetHealthCheckFailures() + assert.Equal(t, 0, mc.consecutiveHealthFailures) + + // Use ctx to silence unused import warnings on stripped builds. + _ = context.Background() +}