Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ tidy: ## Run go mod tidy to ensure dependencies are up to date.

.PHONY: test
test: build lint ## Run all tests with build, lint, and coverage
go test -tags=test -v -cover ./pkg/... ./internal/...
go test -tags=test -v -cover ./pkg/... ./internal/... ./cmd/...

.PHONY: test-only
test-only: ## Run tests only (without build/lint for faster iteration)
go test -tags=test -v -cover ./pkg/... ./internal/...
go test -tags=test -v -cover ./pkg/... ./internal/... ./cmd/...

.PHONY: e2e
e2e: test retag
Expand Down
134 changes: 30 additions & 104 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/joho/godotenv"
"github.com/kagent-dev/tools/internal/logger"
mcpserver "github.com/kagent-dev/tools/internal/mcp"
"github.com/kagent-dev/tools/internal/metrics"
"github.com/kagent-dev/tools/internal/telemetry"
"github.com/kagent-dev/tools/internal/version"
Expand All @@ -33,8 +34,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
sdkmcp "github.com/modelcontextprotocol/go-sdk/mcp"
)

var (
Expand Down Expand Up @@ -140,16 +140,14 @@ func run(cmd *cobra.Command, args []string) {
logger.Get().Info("Running in read-only mode - write operations are disabled")
}

mcp := server.NewMCPServer(
Name,
Version,
)
mcpSrv := sdkmcp.NewServer(&sdkmcp.Implementation{Name: Name, Version: Version}, nil)

// Attach a single receiving middleware that instruments every tools/call
// with an OTel span and Prometheus invocation counters. Per-tool provider
// labels are recorded as each provider registers its tools.
mcpSrv.AddReceivingMiddleware(mcpserver.ToolMiddleware())

// Register tools and wrap handlers with metrics instrumentation.
// registerMCP returns a map of tool_name -> tool_provider so that
// wrapToolHandlersWithMetrics knows which provider each tool belongs to.
toolProviders := registerMCP(mcp, tools, *kubeconfig, readOnly)
wrapToolHandlersWithMetrics(mcp, toolProviders)
registerMCP(mcpSrv, tools, *kubeconfig, readOnly)

// Create wait group for server goroutines
var wg sync.WaitGroup
Expand All @@ -167,11 +165,12 @@ func run(cmd *cobra.Command, args []string) {
if stdio {
go func() {
defer wg.Done()
runStdioServer(ctx, mcp)
runStdioServer(ctx, mcpSrv)
}()
} else {
sseServer := server.NewStreamableHTTPServer(mcp,
server.WithHeartbeatInterval(30*time.Second),
sseServer := sdkmcp.NewStreamableHTTPHandler(
func(*http.Request) *sdkmcp.Server { return mcpSrv },
nil,
)

// Create a mux to handle different routes
Expand Down Expand Up @@ -293,29 +292,27 @@ func writeResponse(w http.ResponseWriter, data []byte) error {
return err
}

func runStdioServer(ctx context.Context, mcp *server.MCPServer) {
func runStdioServer(ctx context.Context, mcpSrv *sdkmcp.Server) {
logger.Get().Info("Running KAgent Tools Server STDIO:", "tools", strings.Join(tools, ","))
stdioServer := server.NewStdioServer(mcp)
if err := stdioServer.Listen(ctx, os.Stdin, os.Stdout); err != nil {
if err := mcpSrv.Run(ctx, &sdkmcp.StdioTransport{}); err != nil {
logger.Get().Info("Stdio server stopped", "error", err)
}
}

// registerMCP registers tool providers with the MCP server and returns a mapping
// of tool_name -> tool_provider. This mapping is built using the ListTools() diff
// technique: we snapshot the tool list before and after each provider registers,
// so we know exactly which tools belong to which provider.
func registerMCP(mcp *server.MCPServer, enabledToolProviders []string, kubeconfig string, readOnly bool) map[string]string {
// A map to hold tool providers and their registration functions
toolProviderMap := map[string]func(*server.MCPServer){
"argo": func(s *server.MCPServer) { argo.RegisterTools(s, readOnly) },
"cilium": func(s *server.MCPServer) { cilium.RegisterTools(s, readOnly) },
"helm": func(s *server.MCPServer) { helm.RegisterTools(s, readOnly) },
"istio": func(s *server.MCPServer) { istio.RegisterTools(s, readOnly) },
"k8s": func(s *server.MCPServer) { k8s.RegisterTools(s, nil, kubeconfig, readOnly) },
"kubescape": func(s *server.MCPServer) { kubescape.RegisterTools(s, kubeconfig, readOnly) },
"prometheus": func(s *server.MCPServer) { prometheus.RegisterTools(s, readOnly) },
"utils": func(s *server.MCPServer) { utils.RegisterTools(s, readOnly) },
// registerMCP registers the enabled tool providers with the MCP server. Each
// provider's RegisterTools call records tool->provider mappings and the tool
// inventory metric centrally (see internal/mcp.AddTool); invocation metrics and
// tracing are applied by the receiving middleware installed in run().
func registerMCP(mcpSrv *sdkmcp.Server, enabledToolProviders []string, kubeconfig string, readOnly bool) {
toolProviderMap := map[string]func(*sdkmcp.Server){
"argo": func(s *sdkmcp.Server) { argo.RegisterTools(s, readOnly) },
"cilium": func(s *sdkmcp.Server) { cilium.RegisterTools(s, readOnly) },
"helm": func(s *sdkmcp.Server) { helm.RegisterTools(s, readOnly) },
"istio": func(s *sdkmcp.Server) { istio.RegisterTools(s, readOnly) },
"k8s": func(s *sdkmcp.Server) { k8s.RegisterTools(s, nil, kubeconfig, readOnly) },
"kubescape": func(s *sdkmcp.Server) { kubescape.RegisterTools(s, kubeconfig, readOnly) },
"prometheus": func(s *sdkmcp.Server) { prometheus.RegisterTools(s, readOnly) },
"utils": func(s *sdkmcp.Server) { utils.RegisterTools(s, readOnly) },
}

// If no specific tools are specified, register all available tools.
Expand All @@ -325,82 +322,11 @@ func registerMCP(mcp *server.MCPServer, enabledToolProviders []string, kubeconfi
}
}

// toolToProvider maps each tool name to its provider (e.g., "kubectl_get" -> "k8s").
// This is used later by wrapToolHandlersWithMetrics to set the correct tool_provider label.
toolToProvider := make(map[string]string)

for _, toolProviderName := range enabledToolProviders {
if registerFunc, ok := toolProviderMap[toolProviderName]; ok {
// Snapshot the tool list before this provider registers its tools.
// We need this because ListTools() returns ALL tools from ALL providers,
// so the only way to know which tools belong to THIS provider is to compare
// the list before and after registration.
toolsBefore := mcp.ListTools()

registerFunc(mcp)

// Determine which tools were just registered by this provider
// by finding tools that exist now but didn't exist before.
// Record each one in Prometheus so we can observe the full tool inventory.
for toolName := range mcp.ListTools() {
if _, existed := toolsBefore[toolName]; !existed {
metrics.KagentToolsMCPRegisteredTools.WithLabelValues(toolName, toolProviderName).Set(1)
toolToProvider[toolName] = toolProviderName
}
}
registerFunc(mcpSrv)
} else {
logger.Get().Error("Unknown tool specified", "provider", toolProviderName)
}
}

return toolToProvider
}

// wrapToolHandlersWithMetrics applies the wrapper/middleware pattern to instrument
// all registered MCP tool handlers with Prometheus invocation counters.
//
// How it works:
// 1. Grab all registered tools from the MCP server using ListTools()
// 2. For each tool, wrap its handler with a function that increments metrics
// 3. Replace all tools in the MCP server using SetTools()
//
// The wrapper function:
// - Increments kagent_tools_mcp_invocations_total on every call
// - Increments kagent_tools_mcp_invocations_failure_total when the handler returns a
// non-nil Go error OR when result.IsError is true (the MCP convention for tool-level
// failures - handlers return NewToolResultError(...), nil, not a Go error)
// - Calls the original handler unchanged - the tool's behaviour is not affected
//
// This uses the standard middleware/decorator pattern: the original handler and the
// wrapped handler have the same function signature, so they are interchangeable.
// No changes are required in any pkg/ file - all instrumentation happens centrally here.
func wrapToolHandlersWithMetrics(mcpServer *server.MCPServer, toolToProvider map[string]string) {
allTools := mcpServer.ListTools()
wrapped := make([]server.ServerTool, 0, len(allTools))

for name, st := range allTools {
originalHandler := st.Handler
toolName := name // capture for closure
provider := toolToProvider[toolName]

wrapped = append(wrapped, server.ServerTool{
Tool: st.Tool,
Handler: func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
metrics.KagentToolsMCPInvocationsTotal.WithLabelValues(toolName, provider).Inc()

result, err := originalHandler(ctx, req)

// Count as failure if the Go error is non-nil OR if the tool returned
// a result with IsError=true (the MCP convention for tool-level failures,
// which always return nil for the Go error).
if err != nil || (result != nil && result.IsError) {
metrics.KagentToolsMCPInvocationsFailureTotal.WithLabelValues(toolName, provider).Inc()
}

return result, err
},
})
}

mcpServer.SetTools(wrapped...)
}
127 changes: 0 additions & 127 deletions cmd/metrics_wrap_test.go

This file was deleted.

Loading
Loading