Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/mcp/pftools/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ var (
ErrFunctionNotFound = errors.New("function not found")
// ErrNotOurMessage indicates a message that should be ignored.
ErrNotOurMessage = errors.New("not our message")
// ErrFunctionNoInputTopics indicates the function has no input topics.
ErrFunctionNoInputTopics = errors.New("function has no input topics")
// ErrSchemaConversionFailed indicates the schema conversion failed.
ErrSchemaConversionFailed = errors.New("schema conversion failed")
)

// IsClusterUnhealthy checks if an error indicates cluster health issues
Expand Down Expand Up @@ -128,3 +132,17 @@ func isNotFoundText(text string) bool {
}
return false
}

// classifyConvertError reports whether a conversion failure is retryable.
func classifyConvertError(err error) failureCategory {
if err == nil {
return failureUnknown
}
if errors.Is(err, ErrFunctionNoInputTopics) || errors.Is(err, ErrSchemaConversionFailed) {
return failurePermanent
}
if IsClusterUnhealthy(err) || IsAuthError(err) || IsNetworkError(err) {
return failureRetryable
}
return failureUnknown
}
43 changes: 43 additions & 0 deletions pkg/mcp/pftools/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,46 @@ func TestIsNotFoundError(t *testing.T) {
}
})
}

func TestClassifyConvertError(t *testing.T) {
t.Run("no input topics is permanent", func(t *testing.T) {
if classifyConvertError(ErrFunctionNoInputTopics) != failurePermanent {
t.Fatalf("expected permanent for no input topics")
}
})

t.Run("schema conversion is permanent", func(t *testing.T) {
err := errors.Join(ErrSchemaConversionFailed, errors.New("boom"))
if classifyConvertError(err) != failurePermanent {
t.Fatalf("expected permanent for schema conversion failure")
}
})

t.Run("network error is retryable", func(t *testing.T) {
err := errors.New("connection refused")
if classifyConvertError(err) != failureRetryable {
t.Fatalf("expected retryable for network error")
}
})

t.Run("auth error is retryable", func(t *testing.T) {
err := errors.New("token expired")
if classifyConvertError(err) != failureRetryable {
t.Fatalf("expected retryable for auth error")
}
})

t.Run("cluster error is retryable", func(t *testing.T) {
err := rest.Error{Code: 503, Reason: "no healthy upstream"}
if classifyConvertError(err) != failureRetryable {
t.Fatalf("expected retryable for cluster error")
}
})

t.Run("unknown error is unknown", func(t *testing.T) {
err := errors.New("something else")
if classifyConvertError(err) != failureUnknown {
t.Fatalf("expected unknown for generic error")
}
})
}
154 changes: 136 additions & 18 deletions pkg/mcp/pftools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package pftools

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
Expand Down Expand Up @@ -93,6 +95,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
v2adminClient: v2adminClient,
pulsarClient: pulsarClient,
fnToToolMap: make(map[string]*FunctionTool),
failedFunctions: make(map[string]*functionFailureState),
mutex: sync.RWMutex{},
producerCache: make(map[string]pulsarclient.Producer),
producerMutex: sync.RWMutex{},
Expand Down Expand Up @@ -171,39 +174,93 @@ func (m *PulsarFunctionManager) updateFunctions() {
fullName := getFunctionFullName(fn.Tenant, fn.Namespace, fn.Name)
seenFunctions[fullName] = true

configHash, hashErr := computeFunctionConfigHash(fn)
if hashErr != nil {
log.Printf("Failed to compute config hash for function %s: %v", fullName, hashErr)
}

// Check if we already have this function
m.mutex.RLock()
_, exists := m.fnToToolMap[fullName]
existingFn, exists := m.fnToToolMap[fullName]
failureState, hasFailure := m.failedFunctions[fullName]
m.mutex.RUnlock()

if hasFailure && configHash != "" && failureState.configHash != configHash {
m.mutex.Lock()
delete(m.failedFunctions, fullName)
m.mutex.Unlock()
hasFailure = false
failureState = nil
}

changed := false
if exists {
// Check if the function has changed
existingFn, exists := m.fnToToolMap[fullName]
if exists {
if !cmp.Equal(*existingFn.Function, *fn) {
changed = true
}
if !existingFn.SchemaFetchSuccess {
changed = true
}
if !cmp.Equal(*existingFn.Function, *fn) {
changed = true
}
if !existingFn.SchemaFetchSuccess {
changed = true
}
if !changed {
continue
}
}

if hasFailure && configHash != "" && failureState.configHash == configHash {
if shouldSkipFailure(failureState, m.pollInterval, time.Now()) {
continue
}
}

// Convert function to tool
attemptAt := time.Now()
fnTool, err := m.convertFunctionToTool(fn)
if err != nil || !fnTool.SchemaFetchSuccess {
if err != nil {
log.Printf("Failed to convert function %s to tool: %v", fullName, err)
} else {
log.Printf("Failed to fetch schema for function %s, retry later...", fullName)
if err != nil || (fnTool != nil && !fnTool.SchemaFetchSuccess) {
failureErr := err
if failureErr == nil && fnTool != nil && fnTool.SchemaFetchError != nil {
failureErr = fnTool.SchemaFetchError
}
if failureErr == nil {
failureErr = errors.New("schema fetch failed")
}

category := classifyConvertError(failureErr)
errorMsg := failureErr.Error()
logNow := shouldLogFailure(failureState, configHash, category, errorMsg)

if configHash != "" {
newState := &functionFailureState{
configHash: configHash,
category: category,
lastError: errorMsg,
lastAttemptAt: attemptAt,
}
if logNow {
newState.lastLoggedAt = time.Now()
} else if failureState != nil {
newState.lastLoggedAt = failureState.lastLoggedAt
}
m.mutex.Lock()
m.failedFunctions[fullName] = newState
m.mutex.Unlock()
}
if logNow {
if err != nil {
log.Printf("Failed to convert function %s to tool: %v (category=%s)", fullName, failureErr, category)
} else {
log.Printf("Failed to fetch schema for function %s, retry later: %v (category=%s)", fullName, failureErr, category)
}
}
continue
}

if hasFailure {
m.mutex.Lock()
delete(m.failedFunctions, fullName)
m.mutex.Unlock()
}

if changed {
if m.sessionID != "" {
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
Expand Down Expand Up @@ -248,12 +305,61 @@ func (m *PulsarFunctionManager) updateFunctions() {
m.mcpServer.DeleteTools(fnTool.Tool.Name)
}
delete(m.fnToToolMap, fullName)
delete(m.failedFunctions, fullName)
log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name)
}
}
m.mutex.Unlock()
}

func computeFunctionConfigHash(fn *utils.FunctionConfig) (string, error) {
if fn == nil {
return "", errors.New("function config is nil")
}
data, err := json.Marshal(fn)
if err != nil {
return "", err
}
sum := sha256.Sum256(data)
return fmt.Sprintf("%x", sum[:]), nil
}

func shouldSkipFailure(state *functionFailureState, pollInterval time.Duration, now time.Time) bool {
if state == nil {
return false
}
switch state.category {
case failurePermanent:
return true
case failureRetryable:
if state.lastAttemptAt.IsZero() {
return false
}
return now.Sub(state.lastAttemptAt) < pollInterval
default:
return true
}
}

func shouldLogFailure(prev *functionFailureState, configHash string, category failureCategory, errMsg string) bool {
if prev == nil {
return true
}
if configHash == "" {
return true
}
if prev.configHash != configHash {
return true
}
if prev.category != category {
return true
}
if prev.lastError != errMsg {
return true
}
return false
}

// getFunctionsList retrieves all functions from the specified tenants/namespaces
func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, error) {
var allFunctions []*utils.FunctionConfig
Expand Down Expand Up @@ -354,9 +460,10 @@ func (m *PulsarFunctionManager) getFunctionsInNamespace(tenant, namespace string
// convertFunctionToTool converts a Pulsar Function to an MCP Tool
func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) (*FunctionTool, error) {
schemaFetchSuccess := true
var schemaFetchErr error
// Determine input and output topics
if len(fn.InputSpecs) == 0 {
return nil, fmt.Errorf("function has no input topics")
return nil, ErrFunctionNoInputTopics
}

var inputTopic string
Expand All @@ -366,7 +473,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
break
}
if inputTopic == "" {
return nil, fmt.Errorf("function has no input topics")
return nil, ErrFunctionNoInputTopics
}

// Get schema for input topic
Expand All @@ -378,7 +485,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
if restError.Code != 404 {
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
} else {
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
}

Expand All @@ -394,7 +506,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
if restError.Code != 404 {
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
} else {
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
}
}
Expand All @@ -409,12 +526,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)

schemaConverter, err := schema.ConverterFactory(inputSchema.Type)
if err != nil {
return nil, fmt.Errorf("failed to create schema converter: %w", err)
return nil, errors.Join(ErrSchemaConversionFailed, err)
}

toolInputSchemaProperties, err := schemaConverter.ToMCPToolInputSchemaProperties(inputSchema.PulsarSchemaInfo)
if err != nil {
return nil, fmt.Errorf("failed to convert input schema to MCP tool input schema properties: %w", err)
return nil, errors.Join(ErrSchemaConversionFailed, err)
}

toolInputSchemaProperties = append(toolInputSchemaProperties, mcp.WithDescription(description))
Expand All @@ -441,6 +558,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
OutputTopic: outputTopic,
Tool: tool,
SchemaFetchSuccess: schemaFetchSuccess,
SchemaFetchError: schemaFetchErr,
}, nil
}

Expand Down
Loading
Loading