Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 96 additions & 4 deletions internal/upstream/managed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,22 @@ type Client struct {

// Tool discovery callback for notifications/tools/list_changed handling
toolDiscoveryCallback func(ctx context.Context, serverName string) error

// consecutiveHealthFailures counts back-to-back transient health-check
// failures. The state-machine only flips to Error once it reaches
// healthCheckFailureThreshold; one success resets it. Hard failures
// (connection refused, no such host, unreachable) bypass the counter
// and trigger Error immediately. See recordHealthCheckFailure().
consecutiveHealthFailures int
}

// healthCheckFailureThreshold is the number of consecutive transient
// health-check failures we tolerate before marking the server Error.
// With a 30-second tick this is ~90s of unreachability — long enough that a
// real outage still surfaces promptly, short enough that one slow upstream
// request doesn't paint the UI red and clear the tools list.
const healthCheckFailureThreshold = 3

// NewClient creates a new managed client with state management
func NewClient(id string, serverConfig *config.ServerConfig, logger *zap.Logger, logConfig *config.LogConfig, globalConfig *config.Config, storage *storage.BoltDB, secretResolver *secret.Resolver) (*Client, error) {
// Create core client
Expand Down Expand Up @@ -229,6 +243,11 @@ func (mc *Client) Connect(ctx context.Context) error {
mc.StateManager.TransitionTo(types.StateReady)
}

// Wipe any consecutive-failure debt accumulated before reconnect so the
// new session starts at zero. Without this, a server that flapped, then
// recovered, would carry stale counts into the next health-check window.
mc.resetHealthCheckFailures()

// Update state manager with server info
if serverInfo := mc.coreClient.GetServerInfo(); serverInfo != nil {
mc.StateManager.SetServerInfo(serverInfo.ServerInfo.Name, serverInfo.ServerInfo.Version)
Expand Down Expand Up @@ -831,10 +850,19 @@ func (mc *Client) performHealthCheck() {
if err != nil {
// Only mark as error if it's a real connection issue, not timeout during high activity
if mc.isConnectionError(err) {
mc.logger.Warn("Health check failed with connection error, marking as error",
zap.String("server", mc.Config.Name),
zap.Error(err))
mc.StateManager.SetError(err)
if mc.recordHealthCheckFailure(err) {
mc.logger.Warn("Health check failed repeatedly, marking as error",
zap.String("server", mc.Config.Name),
zap.Int("consecutive_failures", mc.consecutiveHealthFailures),
zap.Error(err))
mc.StateManager.SetError(err)
} else {
mc.logger.Info("Health check failed transiently, tolerating below threshold",
zap.String("server", mc.Config.Name),
zap.Int("consecutive_failures", mc.consecutiveHealthFailures),
zap.Int("threshold", healthCheckFailureThreshold),
zap.Error(err))
}
} else {
mc.logger.Debug("Health check failed with timeout (high activity), ignoring",
zap.String("server", mc.Config.Name),
Expand All @@ -843,10 +871,74 @@ func (mc *Client) performHealthCheck() {
return
}

mc.recordHealthCheckSuccess()
mc.logger.Debug("Health check passed successfully",
zap.String("server", mc.Config.Name))
}

// recordHealthCheckFailure increments the consecutive-failure counter and
// returns whether the caller should now flip the state machine to Error.
//
// Transient errors (timeout, deadline exceeded, context canceled) need
// healthCheckFailureThreshold consecutive misses before they're considered a
// real outage — slow upstreams (e.g. hf.co/mcp under load) routinely miss a
// single 5-second health-check window without actually being down. Hard
// failures (connection refused, host unreachable, DNS gone) trigger Error
// immediately because waiting buys nothing — the server is genuinely
// unreachable and the user should see that.
func (mc *Client) recordHealthCheckFailure(err error) bool {
mc.consecutiveHealthFailures++
if !isTransientHealthCheckError(err) {
return true
}
return mc.consecutiveHealthFailures >= healthCheckFailureThreshold
}

// recordHealthCheckSuccess resets the consecutive-failure counter. One good
// check is enough to wipe the slate — we're not trying to track flap
// frequency, just preventing single misses from looking like outages.
func (mc *Client) recordHealthCheckSuccess() {
mc.consecutiveHealthFailures = 0
}

// resetHealthCheckFailures clears the counter. Called from the connect
// success path so a successful reconnect doesn't carry stale failure debt
// from before the disconnect.
func (mc *Client) resetHealthCheckFailures() {
mc.consecutiveHealthFailures = 0
}

// isTransientHealthCheckError identifies failure modes that warrant
// flap-resistance — slow upstream / momentary timeout — vs. hard failures
// that should surface to the user immediately.
func isTransientHealthCheckError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
// Hard failures: short-circuit to "not transient" so the caller flips
// Error on the first miss. Order matters — check these BEFORE the
// generic timeout heuristics below.
switch {
case strings.Contains(msg, "connection refused"),
strings.Contains(msg, "no such host"),
strings.Contains(msg, "network is unreachable"),
strings.Contains(msg, "no route to host"),
strings.Contains(msg, "connection reset"),
strings.Contains(msg, "broken pipe"),
strings.Contains(msg, "econnrefused"):
return false
}
// Soft failures: short-window misses we want to tolerate.
switch {
case strings.Contains(msg, "deadline exceeded"),
strings.Contains(msg, "timeout"),
strings.Contains(msg, "context canceled"):
return true
}
return false
}

// RefreshOAuthTokenDirect forces an OAuth token refresh without reconnecting.
// This delegates to the core client's direct refresh implementation.
// Used by the RefreshManager for proactive token refresh before expiration.
Expand Down
126 changes: 126 additions & 0 deletions internal/upstream/managed/health_flap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package managed

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/config"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/types"
)

func newTestClientForHealth(t *testing.T) *Client {
t.Helper()
mc := &Client{
Config: &config.ServerConfig{Name: "flap-server"},
logger: zap.NewNop(),
}
mc.StateManager = types.NewStateManager()
mc.StateManager.TransitionTo(types.StateConnecting)
mc.StateManager.TransitionTo(types.StateReady)
return mc
}

// TestHealthCheck_TransientTimeoutToleratedBelowThreshold verifies that a
// single transient timeout (the slow-upstream case that caused
// MCPX_UNKNOWN_UNCLASSIFIED to surface in the UI on every disable-tool toggle)
// does NOT flip the server to Error state. The state must remain Ready until
// healthCheckFailureThreshold consecutive transient failures have accumulated.
func TestHealthCheck_TransientTimeoutToleratedBelowThreshold(t *testing.T) {
mc := newTestClientForHealth(t)
timeoutErr := errors.New(`failed to list tools: transport error: failed to send request: failed to send request: Post "https://hf.co/mcp": context deadline exceeded`)

// First (threshold-1) failures must be tolerated — counter increments
// but state stays Ready.
for i := 1; i < healthCheckFailureThreshold; i++ {
shouldError := mc.recordHealthCheckFailure(timeoutErr)
assert.False(t, shouldError, "transient failure #%d should be tolerated", i)
assert.Equal(t, types.StateReady, mc.StateManager.GetState(),
"state should stay Ready after transient failure #%d", i)
}

// The Nth consecutive failure tips us over.
shouldError := mc.recordHealthCheckFailure(timeoutErr)
assert.True(t, shouldError, "Nth transient failure should trigger Error transition")
}

// TestHealthCheck_HardErrorTriggersImmediateError verifies that hard
// connection failures (connection refused, host unreachable) bypass the
// flap-resistance threshold — the server is genuinely down and the user
// should see that immediately, not wait 90 seconds.
func TestHealthCheck_HardErrorTriggersImmediateError(t *testing.T) {
mc := newTestClientForHealth(t)
hardErr := errors.New("dial tcp 127.0.0.1:65535: connect: connection refused")

shouldError := mc.recordHealthCheckFailure(hardErr)
assert.True(t, shouldError, "hard error must trigger Error on first occurrence")
}

// TestHealthCheck_SuccessResetsCounter verifies that after a transient
// failure, a successful health check resets the consecutive-failure counter
// — so two failures spaced by a recovery don't add up to the threshold.
func TestHealthCheck_SuccessResetsCounter(t *testing.T) {
mc := newTestClientForHealth(t)
timeoutErr := errors.New("transport error: context deadline exceeded")

// Accumulate threshold-1 failures.
for i := 1; i < healthCheckFailureThreshold; i++ {
mc.recordHealthCheckFailure(timeoutErr)
}
require.Equal(t, healthCheckFailureThreshold-1, mc.consecutiveHealthFailures)

// One success wipes the slate.
mc.recordHealthCheckSuccess()
assert.Equal(t, 0, mc.consecutiveHealthFailures)

// Now the next transient failure must be back to "tolerated".
shouldError := mc.recordHealthCheckFailure(timeoutErr)
assert.False(t, shouldError, "first failure after recovery must be tolerated again")
}

// TestIsTransientHealthCheckError covers the categorisation that gates the
// flap-resistance behavior.
func TestIsTransientHealthCheckError(t *testing.T) {
cases := []struct {
name string
err error
transient bool
}{
{"context deadline exceeded", errors.New("Post: context deadline exceeded"), true},
{"explicit timeout word", errors.New("net/http: request timeout"), true},
{"context canceled", errors.New("operation: context canceled"), true},
{"connection refused", errors.New("dial: connection refused"), false},
{"no such host", errors.New("dial tcp: lookup nope.invalid: no such host"), false},
{"network unreachable", errors.New("network is unreachable"), false},
{"connection reset", errors.New("read: connection reset by peer"), false},
{"nil error", nil, false},
}
for _, tc := range cases {
got := isTransientHealthCheckError(tc.err)
if got != tc.transient {
t.Errorf("%s: isTransientHealthCheckError = %v, want %v", tc.name, got, tc.transient)
}
}
}

// TestHealthCheck_ResetOnConnect verifies that a fresh Connect() clears the
// consecutive-failure counter even if a previous session accumulated some.
// Important so reconnect cycles don't carry stale failure debt.
func TestHealthCheck_ResetOnConnect(t *testing.T) {
mc := newTestClientForHealth(t)
timeoutErr := errors.New("context deadline exceeded")

mc.recordHealthCheckFailure(timeoutErr)
mc.recordHealthCheckFailure(timeoutErr)
require.Equal(t, 2, mc.consecutiveHealthFailures)

mc.resetHealthCheckFailures()
assert.Equal(t, 0, mc.consecutiveHealthFailures)

// Use ctx to silence unused import warnings on stripped builds.
_ = context.Background()
}
Loading