From 8ad051b1e79218db3c24e38384645e0b35ce4565 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 5 Feb 2026 17:31:42 +0800 Subject: [PATCH 1/2] feat(pftools): enhance function failure handling with retry logic --- pkg/mcp/pftools/errors.go | 18 ++ pkg/mcp/pftools/errors_test.go | 43 +++++ pkg/mcp/pftools/manager.go | 154 ++++++++++++++++-- pkg/mcp/pftools/manager_failure_cache_test.go | 97 +++++++++++ pkg/mcp/pftools/types.go | 18 ++ 5 files changed, 312 insertions(+), 18 deletions(-) create mode 100644 pkg/mcp/pftools/manager_failure_cache_test.go diff --git a/pkg/mcp/pftools/errors.go b/pkg/mcp/pftools/errors.go index 149c83f..f889f56 100644 --- a/pkg/mcp/pftools/errors.go +++ b/pkg/mcp/pftools/errors.go @@ -28,6 +28,10 @@ var ( ErrFunctionNotFound = errors.New("function not found") // ErrNotOurMessage indicates a message that should be ignored. ErrNotOurMessage = errors.New("not our message") + // ErrFunctionNoInputTopics indicates the function has no input topics. + ErrFunctionNoInputTopics = errors.New("function has no input topics") + // ErrSchemaConversionFailed indicates the schema conversion failed. + ErrSchemaConversionFailed = errors.New("schema conversion failed") ) // IsClusterUnhealthy checks if an error indicates cluster health issues @@ -128,3 +132,17 @@ func isNotFoundText(text string) bool { } return false } + +// classifyConvertError reports whether a conversion failure is retryable. +func classifyConvertError(err error) failureCategory { + if err == nil { + return failureUnknown + } + if errors.Is(err, ErrFunctionNoInputTopics) || errors.Is(err, ErrSchemaConversionFailed) { + return failurePermanent + } + if IsClusterUnhealthy(err) || IsAuthError(err) || IsNetworkError(err) { + return failureRetryable + } + return failureUnknown +} diff --git a/pkg/mcp/pftools/errors_test.go b/pkg/mcp/pftools/errors_test.go index cc5cee5..0edcd17 100644 --- a/pkg/mcp/pftools/errors_test.go +++ b/pkg/mcp/pftools/errors_test.go @@ -61,3 +61,46 @@ func TestIsNotFoundError(t *testing.T) { } }) } + +func TestClassifyConvertError(t *testing.T) { + t.Run("no input topics is permanent", func(t *testing.T) { + if classifyConvertError(ErrFunctionNoInputTopics) != failurePermanent { + t.Fatalf("expected permanent for no input topics") + } + }) + + t.Run("schema conversion is permanent", func(t *testing.T) { + err := errors.Join(ErrSchemaConversionFailed, errors.New("boom")) + if classifyConvertError(err) != failurePermanent { + t.Fatalf("expected permanent for schema conversion failure") + } + }) + + t.Run("network error is retryable", func(t *testing.T) { + err := errors.New("connection refused") + if classifyConvertError(err) != failureRetryable { + t.Fatalf("expected retryable for network error") + } + }) + + t.Run("auth error is retryable", func(t *testing.T) { + err := errors.New("token expired") + if classifyConvertError(err) != failureRetryable { + t.Fatalf("expected retryable for auth error") + } + }) + + t.Run("cluster error is retryable", func(t *testing.T) { + err := rest.Error{Code: 503, Reason: "no healthy upstream"} + if classifyConvertError(err) != failureRetryable { + t.Fatalf("expected retryable for cluster error") + } + }) + + t.Run("unknown error is unknown", func(t *testing.T) { + err := errors.New("something else") + if classifyConvertError(err) != failureUnknown { + t.Fatalf("expected unknown for generic error") + } + }) +} diff --git a/pkg/mcp/pftools/manager.go b/pkg/mcp/pftools/manager.go index e150319..083a433 100644 --- a/pkg/mcp/pftools/manager.go +++ b/pkg/mcp/pftools/manager.go @@ -16,7 +16,9 @@ package pftools import ( "context" + "crypto/sha256" "encoding/json" + "errors" "fmt" "log" "strings" @@ -93,6 +95,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO v2adminClient: v2adminClient, pulsarClient: pulsarClient, fnToToolMap: make(map[string]*FunctionTool), + failedFunctions: make(map[string]*functionFailureState), mutex: sync.RWMutex{}, producerCache: make(map[string]pulsarclient.Producer), producerMutex: sync.RWMutex{}, @@ -171,39 +174,93 @@ func (m *PulsarFunctionManager) updateFunctions() { fullName := getFunctionFullName(fn.Tenant, fn.Namespace, fn.Name) seenFunctions[fullName] = true + configHash, hashErr := computeFunctionConfigHash(fn) + if hashErr != nil { + log.Printf("Failed to compute config hash for function %s: %v", fullName, hashErr) + } + // Check if we already have this function m.mutex.RLock() - _, exists := m.fnToToolMap[fullName] + existingFn, exists := m.fnToToolMap[fullName] + failureState, hasFailure := m.failedFunctions[fullName] m.mutex.RUnlock() + if hasFailure && configHash != "" && failureState.configHash != configHash { + m.mutex.Lock() + delete(m.failedFunctions, fullName) + m.mutex.Unlock() + hasFailure = false + failureState = nil + } + changed := false if exists { // Check if the function has changed - existingFn, exists := m.fnToToolMap[fullName] - if exists { - if !cmp.Equal(*existingFn.Function, *fn) { - changed = true - } - if !existingFn.SchemaFetchSuccess { - changed = true - } + if !cmp.Equal(*existingFn.Function, *fn) { + changed = true + } + if !existingFn.SchemaFetchSuccess { + changed = true } if !changed { continue } } + if hasFailure && configHash != "" && failureState.configHash == configHash { + if shouldSkipFailure(failureState, m.pollInterval, time.Now()) { + continue + } + } + // Convert function to tool + attemptAt := time.Now() fnTool, err := m.convertFunctionToTool(fn) - if err != nil || !fnTool.SchemaFetchSuccess { - if err != nil { - log.Printf("Failed to convert function %s to tool: %v", fullName, err) - } else { - log.Printf("Failed to fetch schema for function %s, retry later...", fullName) + if err != nil || (fnTool != nil && !fnTool.SchemaFetchSuccess) { + failureErr := err + if failureErr == nil && fnTool != nil && fnTool.SchemaFetchError != nil { + failureErr = fnTool.SchemaFetchError + } + if failureErr == nil { + failureErr = errors.New("schema fetch failed") + } + + category := classifyConvertError(failureErr) + errorMsg := failureErr.Error() + logNow := shouldLogFailure(failureState, configHash, category, errorMsg) + + if configHash != "" { + newState := &functionFailureState{ + configHash: configHash, + category: category, + lastError: errorMsg, + lastAttemptAt: attemptAt, + } + if logNow { + newState.lastLoggedAt = time.Now() + } else if failureState != nil { + newState.lastLoggedAt = failureState.lastLoggedAt + } + m.mutex.Lock() + m.failedFunctions[fullName] = newState + m.mutex.Unlock() + } + if logNow { + if err != nil { + log.Printf("Failed to convert function %s to tool: %v (category=%s)", fullName, failureErr, category) + } else { + log.Printf("Failed to fetch schema for function %s, retry later: %v (category=%s)", fullName, failureErr, category) + } } continue } + if hasFailure { + m.mutex.Lock() + delete(m.failedFunctions, fullName) + m.mutex.Unlock() + } + if changed { if m.sessionID != "" { err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name) @@ -248,12 +305,61 @@ func (m *PulsarFunctionManager) updateFunctions() { m.mcpServer.DeleteTools(fnTool.Tool.Name) } delete(m.fnToToolMap, fullName) + delete(m.failedFunctions, fullName) log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name) } } m.mutex.Unlock() } +func computeFunctionConfigHash(fn *utils.FunctionConfig) (string, error) { + if fn == nil { + return "", errors.New("function config is nil") + } + data, err := json.Marshal(fn) + if err != nil { + return "", err + } + sum := sha256.Sum256(data) + return fmt.Sprintf("%x", sum[:]), nil +} + +func shouldSkipFailure(state *functionFailureState, pollInterval time.Duration, now time.Time) bool { + if state == nil { + return false + } + switch state.category { + case failurePermanent: + return true + case failureRetryable: + if state.lastAttemptAt.IsZero() { + return false + } + return now.Sub(state.lastAttemptAt) < pollInterval + default: + return true + } +} + +func shouldLogFailure(prev *functionFailureState, configHash string, category failureCategory, errMsg string) bool { + if prev == nil { + return true + } + if configHash == "" { + return true + } + if prev.configHash != configHash { + return true + } + if prev.category != category { + return true + } + if prev.lastError != errMsg { + return true + } + return false +} + // getFunctionsList retrieves all functions from the specified tenants/namespaces func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, error) { var allFunctions []*utils.FunctionConfig @@ -354,9 +460,10 @@ func (m *PulsarFunctionManager) getFunctionsInNamespace(tenant, namespace string // convertFunctionToTool converts a Pulsar Function to an MCP Tool func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) (*FunctionTool, error) { schemaFetchSuccess := true + var schemaFetchErr error // Determine input and output topics if len(fn.InputSpecs) == 0 { - return nil, fmt.Errorf("function has no input topics") + return nil, ErrFunctionNoInputTopics } var inputTopic string @@ -366,7 +473,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) break } if inputTopic == "" { - return nil, fmt.Errorf("function has no input topics") + return nil, ErrFunctionNoInputTopics } // Get schema for input topic @@ -378,7 +485,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) if restError.Code != 404 { log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err) schemaFetchSuccess = false + schemaFetchErr = errors.Join(schemaFetchErr, err) } + } else { + log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err) + schemaFetchSuccess = false + schemaFetchErr = errors.Join(schemaFetchErr, err) } } @@ -394,7 +506,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) if restError.Code != 404 { log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err) schemaFetchSuccess = false + schemaFetchErr = errors.Join(schemaFetchErr, err) } + } else { + log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err) + schemaFetchSuccess = false + schemaFetchErr = errors.Join(schemaFetchErr, err) } } } @@ -409,12 +526,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) schemaConverter, err := schema.ConverterFactory(inputSchema.Type) if err != nil { - return nil, fmt.Errorf("failed to create schema converter: %w", err) + return nil, errors.Join(ErrSchemaConversionFailed, err) } toolInputSchemaProperties, err := schemaConverter.ToMCPToolInputSchemaProperties(inputSchema.PulsarSchemaInfo) if err != nil { - return nil, fmt.Errorf("failed to convert input schema to MCP tool input schema properties: %w", err) + return nil, errors.Join(ErrSchemaConversionFailed, err) } toolInputSchemaProperties = append(toolInputSchemaProperties, mcp.WithDescription(description)) @@ -441,6 +558,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) OutputTopic: outputTopic, Tool: tool, SchemaFetchSuccess: schemaFetchSuccess, + SchemaFetchError: schemaFetchErr, }, nil } diff --git a/pkg/mcp/pftools/manager_failure_cache_test.go b/pkg/mcp/pftools/manager_failure_cache_test.go new file mode 100644 index 0000000..a3302c6 --- /dev/null +++ b/pkg/mcp/pftools/manager_failure_cache_test.go @@ -0,0 +1,97 @@ +package pftools + +import ( + "testing" + "time" +) + +func TestShouldSkipFailure(t *testing.T) { + now := time.Date(2026, 2, 5, 12, 0, 0, 0, time.UTC) + pollInterval := 30 * time.Second + + t.Run("permanent skips", func(t *testing.T) { + state := &functionFailureState{category: failurePermanent} + if !shouldSkipFailure(state, pollInterval, now) { + t.Fatalf("expected permanent failure to skip") + } + }) + + t.Run("retryable respects interval", func(t *testing.T) { + state := &functionFailureState{ + category: failureRetryable, + lastAttemptAt: now.Add(-10 * time.Second), + } + if !shouldSkipFailure(state, pollInterval, now) { + t.Fatalf("expected retryable failure within interval to skip") + } + }) + + t.Run("retryable after interval", func(t *testing.T) { + state := &functionFailureState{ + category: failureRetryable, + lastAttemptAt: now.Add(-40 * time.Second), + } + if shouldSkipFailure(state, pollInterval, now) { + t.Fatalf("expected retryable failure after interval to retry") + } + }) + + t.Run("unknown skips", func(t *testing.T) { + state := &functionFailureState{category: failureUnknown} + if !shouldSkipFailure(state, pollInterval, now) { + t.Fatalf("expected unknown failure to skip") + } + }) +} + +func TestShouldLogFailure(t *testing.T) { + t.Run("first failure logs", func(t *testing.T) { + if !shouldLogFailure(nil, "hash", failureRetryable, "boom") { + t.Fatalf("expected first failure to log") + } + }) + + t.Run("same state does not log", func(t *testing.T) { + prev := &functionFailureState{ + configHash: "hash", + category: failureRetryable, + lastError: "boom", + } + if shouldLogFailure(prev, "hash", failureRetryable, "boom") { + t.Fatalf("expected identical failure to not log") + } + }) + + t.Run("category change logs", func(t *testing.T) { + prev := &functionFailureState{ + configHash: "hash", + category: failureRetryable, + lastError: "boom", + } + if !shouldLogFailure(prev, "hash", failurePermanent, "boom") { + t.Fatalf("expected category change to log") + } + }) + + t.Run("error change logs", func(t *testing.T) { + prev := &functionFailureState{ + configHash: "hash", + category: failureRetryable, + lastError: "boom", + } + if !shouldLogFailure(prev, "hash", failureRetryable, "different") { + t.Fatalf("expected error change to log") + } + }) + + t.Run("config change logs", func(t *testing.T) { + prev := &functionFailureState{ + configHash: "hash", + category: failureRetryable, + lastError: "boom", + } + if !shouldLogFailure(prev, "newhash", failureRetryable, "boom") { + t.Fatalf("expected config change to log") + } + }) +} diff --git a/pkg/mcp/pftools/types.go b/pkg/mcp/pftools/types.go index 77ad7fe..3e01b5a 100644 --- a/pkg/mcp/pftools/types.go +++ b/pkg/mcp/pftools/types.go @@ -32,6 +32,7 @@ type PulsarFunctionManager struct { v2adminClient cmdutils.Client pulsarClient pulsar.Client fnToToolMap map[string]*FunctionTool + failedFunctions map[string]*functionFailureState mutex sync.RWMutex producerCache map[string]pulsar.Producer producerMutex sync.RWMutex @@ -58,6 +59,7 @@ type FunctionTool struct { OutputTopic string Tool mcp.Tool SchemaFetchSuccess bool + SchemaFetchError error } // SchemaInfo represents schema metadata for Pulsar functions. @@ -67,6 +69,22 @@ type SchemaInfo struct { PulsarSchemaInfo *utils.SchemaInfo } +type failureCategory string + +const ( + failurePermanent failureCategory = "permanent" + failureRetryable failureCategory = "retryable" + failureUnknown failureCategory = "unknown" +) + +type functionFailureState struct { + configHash string + category failureCategory + lastError string + lastLoggedAt time.Time + lastAttemptAt time.Time +} + // CircuitBreaker guards function invocations to prevent repeated failures. type CircuitBreaker struct { failureCount int From 5c44925674d3b6583c7637c54eda33d6dd57f099 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 5 Feb 2026 17:59:55 +0800 Subject: [PATCH 2/2] docs(pftools): add Apache 2.0 license header to test file --- pkg/mcp/pftools/manager_failure_cache_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/mcp/pftools/manager_failure_cache_test.go b/pkg/mcp/pftools/manager_failure_cache_test.go index a3302c6..49b59a0 100644 --- a/pkg/mcp/pftools/manager_failure_cache_test.go +++ b/pkg/mcp/pftools/manager_failure_cache_test.go @@ -1,3 +1,17 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package pftools import (