-
Notifications
You must be signed in to change notification settings - Fork 150
Description
Separate Workload Manager Interfaces for CLI and Kubernetes
Problem Statement
The current unified workloads.Manager interface forces Kubernetes implementations to use core.Workload, which is designed for container/runtime workloads. This creates several issues:
- Conceptual Mismatch:
core.Workloadincludes container-specific concepts (Port, runtime.WorkloadStatus, Remote flag) that don't map cleanly to Kubernetes MCPServer CRDs - Unnecessary Coupling: Kubernetes implementation depends on runtime abstractions (
pkg/container/runtime) even though it works directly with CRDs - Forced Conversions: Complex mapping logic required to convert MCPServer CRDs to
core.Workloadformat - Limited Flexibility: Can't expose Kubernetes-specific features (annotations, conditions, phase) without polluting the shared interface
Current State
The codebase currently has a workloads.Manager interface that works well for CLI workloads (containers). There is a pending PR that extends this to also support Kubernetes workloads using the same unified interface.
Current CLI Interface (in codebase):
// Existing Manager interface - works well for CLI
type Manager interface {
GetWorkload(ctx context.Context, workloadName string) (core.Workload, error)
ListWorkloads(ctx context.Context, listAll bool, labelFilters ...string) ([]core.Workload, error)
// ... other methods for container workloads
}Pending PR Approach:
The pending PR extends this to support Kubernetes by having k8sManager also implement Manager and return core.Workload.
Issues with extending the unified approach to Kubernetes:
k8sManager.mcpServerToWorkload()must map MCPServer CRD → core.Workload- Kubernetes-specific fields (MCPServerPhase, Conditions) are lost in conversion
- Depends on
pkg/container/runtimefor WorkloadStatus enum - Container concepts (Port, Remote) don't apply to CRDs but must be handled
- Forces Kubernetes implementation to work with container/runtime abstractions
Proposed Approach
Instead of extending the existing workloads.Manager interface to Kubernetes, we propose creating a separate interface for Kubernetes workloads while keeping the existing CLI interface unchanged. This ensures:
- Zero impact on CLI code: All existing CLI commands continue to work without modification
- CLI keeps existing interface: The current
workloads.Managerinterface continues to work for CLI workloads (containers) - Kubernetes gets dedicated interface:
K8SManagerreturnsk8s.Workload(Kubernetes-specific model) instead of forcingcore.Workload - Better separation: Kubernetes implementation doesn't need to depend on container/runtime abstractions
Architecture
┌─────────────────────────────────────────────────────────────┐
│ workloads Package │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ CLIManager │ │ K8SManager │ │
│ │ Interface │ │ Interface │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ core.Workload │ │ k8s.Workload │ │
│ │ (container) │ │ (MCPServer CRD) │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Container │ │ MCPServer CRD │ │
│ │ Runtime │ │ (Kubernetes) │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Interface Definitions
CLI Manager Interface (Existing in Codebase)
Important: The existing CLI interface in the codebase remains unchanged. All existing CLI code continues to work without any modifications.
// Manager is the existing interface in the codebase
// This interface works perfectly for CLI workloads (containers) - no changes needed
type Manager interface {
// GetWorkload retrieves details of the named workload including its status.
GetWorkload(ctx context.Context, workloadName string) (core.Workload, error)
// ListWorkloads retrieves the states of all workloads.
// The `listAll` parameter determines whether to include workloads that are not running.
// The optional `labelFilters` parameter allows filtering workloads by labels (format: key=value).
ListWorkloads(ctx context.Context, listAll bool, labelFilters ...string) ([]core.Workload, error)
// DeleteWorkloads deletes the specified workloads by name.
// It is implemented as an asynchronous operation which returns an errgroup.Group
DeleteWorkloads(ctx context.Context, names []string) (*errgroup.Group, error)
// StopWorkloads stops the specified workloads by name.
// It is implemented as an asynchronous operation which returns an errgroup.Group
StopWorkloads(ctx context.Context, names []string) (*errgroup.Group, error)
// RunWorkload runs a container in the foreground.
RunWorkload(ctx context.Context, runConfig *runner.RunConfig) error
// RunWorkloadDetached runs a container in the background.
RunWorkloadDetached(ctx context.Context, runConfig *runner.RunConfig) error
// RestartWorkloads restarts the specified workloads by name.
// It is implemented as an asynchronous operation which returns an errgroup.Group
RestartWorkloads(ctx context.Context, names []string, foreground bool) (*errgroup.Group, error)
// UpdateWorkload updates a workload by stopping, deleting, and recreating it.
// It is implemented as an asynchronous operation which returns an errgroup.Group
UpdateWorkload(ctx context.Context, workloadName string, newConfig *runner.RunConfig) (*errgroup.Group, error)
// GetLogs retrieves the logs of a container.
GetLogs(ctx context.Context, containerName string, follow bool) (string, error)
// GetProxyLogs retrieves the proxy logs from the filesystem.
GetProxyLogs(ctx context.Context, workloadName string) (string, error)
// MoveToGroup moves the specified workloads from one group to another by updating their runconfig.
MoveToGroup(ctx context.Context, workloadNames []string, groupFrom string, groupTo string) error
// ListWorkloadsInGroup returns all workload names that belong to the specified group, including stopped workloads.
ListWorkloadsInGroup(ctx context.Context, groupName string) ([]string, error)
// DoesWorkloadExist checks if a workload with the given name exists.
DoesWorkloadExist(ctx context.Context, workloadName string) (bool, error)
}CLI Implementation: The existing cliManager implementation in the codebase continues to work as-is. No changes needed to CLI implementation code.
Kubernetes Manager Interface
// K8SManager manages MCPServer CRD workloads in Kubernetes
type K8SManager interface {
// GetWorkload retrieves an MCPServer CRD
GetWorkload(ctx context.Context, workloadName string) (*k8s.Workload, error)
// ListWorkloads lists MCPServer CRDs
ListWorkloads(ctx context.Context, listAll bool, labelFilters ...string) ([]*k8s.Workload, error)
// Read-only operations (lifecycle managed by operator)
// Note: No Run/Stop/Delete - these are handled by applying/updating CRDs
// Group operations
MoveToGroup(ctx context.Context, workloadNames []string, groupFrom string, groupTo string) error
ListWorkloadsInGroup(ctx context.Context, groupName string) ([]string, error)
DoesWorkloadExist(ctx context.Context, workloadName string) (bool, error)
// Kubernetes-specific operations
GetMCPServer(ctx context.Context, workloadName string) (*mcpv1alpha1.MCPServer, error)
ListMCPServers(ctx context.Context, labelFilters ...string) ([]*mcpv1alpha1.MCPServer, error)
}New Domain Model for Kubernetes
// Package k8s provides Kubernetes-specific workload domain model
package k8s
import (
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)
// Workload represents a Kubernetes MCPServer workload
type Workload struct {
// Name is the MCPServer name
Name string
// Namespace where the MCPServer is deployed
Namespace string
// URL is the service URL for accessing the workload
URL string
// Phase is the MCPServer phase (Running, Pending, Failed, Terminating)
Phase mcpv1alpha1.MCPServerPhase
// Conditions are the MCPServer status conditions
Conditions []mcpv1alpha1.MCPServerCondition
// Group is the group reference from spec.groupRef
Group string
// Transport is the transport type from spec.transport
Transport string
// ProxyPort is the proxy port from spec.proxyPort
ProxyPort int
// Labels from MCPServer metadata
Labels map[string]string
// Annotations from MCPServer metadata
Annotations map[string]string
// CreatedAt is the creation timestamp
CreatedAt time.Time
}Factory Functions
Important: NewManager() continues to return Manager interface to maintain CLI compatibility. Kubernetes code should use NewK8SManager() directly.
// NewCLIManager creates a CLI workload manager
// Returns Manager interface (same as today)
func NewCLIManager(ctx context.Context) (Manager, error) { ... }
// NewK8SManager creates a Kubernetes workload manager
// Returns K8SManager interface (new, separate interface)
// This should be called directly by Kubernetes-specific code
func NewK8SManager(ctx context.Context) (K8SManager, error) { ... }
// NewManager returns a CLI workload manager
// Returns Manager interface (existing behavior, unchanged)
// IMPORTANT: This function only works in CLI mode. For Kubernetes, use NewK8SManager() directly.
func NewManager(ctx context.Context) (Manager, error) {
if rt.IsKubernetesRuntime() {
// In Kubernetes mode, return error or panic to force explicit K8SManager usage
// OR: Return a Manager that wraps K8SManager with adapter (not recommended)
return nil, fmt.Errorf("use workloads.NewK8SManager() for Kubernetes environments")
}
return NewCLIManager(ctx) // Returns Manager (unchanged)
}
// NewManagerFromRuntime continues to work for CLI (unchanged)
func NewManagerFromRuntime(rtRuntime rt.Runtime) (Manager, error) {
if rt.IsKubernetesRuntime() {
// In Kubernetes mode, runtime-based creation doesn't apply
return nil, fmt.Errorf("use workloads.NewK8SManager() for Kubernetes environments")
}
return NewCLIManagerFromRuntime(rtRuntime) // Returns Manager (unchanged)
}Why This Approach:
- CLI Compatibility:
NewManager()continues to returnManagerinterface, so all existing CLI code works without changes - Explicit Kubernetes Usage: Kubernetes code must explicitly call
NewK8SManager(), making it clear they're using a different interface - Type Safety: No
interface{}return type that breaks existing code - Clear Separation: CLI and Kubernetes code paths are clearly separated
Note: The error returned in Kubernetes mode forces explicit usage of NewK8SManager(), making the code path clear and preventing accidental use of the wrong interface.
Backward Compatibility:
- All existing CLI code using
workloads.NewManager(ctx)continues to work exactly as it does today - CLI code gets
Managerinterface (existing behavior) - Kubernetes code explicitly uses
NewK8SManager()to getK8SManagerinterface - The existing CLI implementation in the codebase remains unchanged
Separate Discoverers for vmcp Aggregator
Following the same pattern as groups.Manager, we'll create separate discoverer implementations:
Structure:
pkg/vmcp/aggregator/
├── aggregator.go # BackendDiscoverer interface
├── discoverer.go # Factory function (selects CLI or K8s discoverer)
├── cli_discoverer.go # CLI implementation (works with Manager + core.Workload)
└── k8s_discoverer.go # Kubernetes implementation (works with K8SManager + k8s.Workload)
Factory Function:
// discoverer.go - Factory function (similar to groups/manager.go)
// Accepts interface{} to handle both Manager and K8SManager
// Type assertion happens once in factory, not in discovery logic
func NewBackendDiscoverer(
workloadsManager interface{}, // Can be Manager or K8SManager
groupsManager groups.Manager,
authConfig *config.OutgoingAuthConfig,
) BackendDiscoverer {
if rt.IsKubernetesRuntime() {
k8sMgr, ok := workloadsManager.(workloads.K8SManager)
if !ok {
return nil, fmt.Errorf("expected workloads.K8SManager in Kubernetes mode")
}
return NewK8SBackendDiscoverer(k8sMgr, groupsManager, authConfig)
}
cliMgr, ok := workloadsManager.(workloads.Manager)
if !ok {
return nil, fmt.Errorf("expected workloads.Manager in CLI mode")
}
return NewCLIBackendDiscoverer(cliMgr, groupsManager, authConfig)
}Usage in vmcp:
// cmd/vmcp/app/commands.go
var workloadsManager interface{}
if rt.IsKubernetesRuntime() {
workloadsManager, err = workloads.NewK8SManager(ctx) // Returns K8SManager
} else {
workloadsManager, err = workloads.NewManager(ctx) // Returns Manager
}
discoverer := aggregator.NewBackendDiscoverer(workloadsManager, groupsManager, cfg.OutgoingAuth)CLI Discoverer:
// cli_discoverer.go - CLI implementation
type cliBackendDiscoverer struct {
workloadsManager workloads.Manager // Type-safe: Manager interface
groupsManager groups.Manager
authConfig *config.OutgoingAuthConfig
}
func NewCLIBackendDiscoverer(
workloadsManager workloads.Manager,
groupsManager groups.Manager,
authConfig *config.OutgoingAuthConfig,
) BackendDiscoverer {
return &cliBackendDiscoverer{
workloadsManager: workloadsManager,
groupsManager: groupsManager,
authConfig: authConfig,
}
}
func (d *cliBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
// Works with core.Workload - fully type-safe, no assertions needed
workload, err := d.workloadsManager.GetWorkload(ctx, name)
// Convert core.Workload → vmcp.Backend
}Kubernetes Discoverer:
// k8s_discoverer.go - Kubernetes implementation
type k8sBackendDiscoverer struct {
workloadsManager workloads.K8SManager // Type-safe: K8SManager interface
groupsManager groups.Manager
authConfig *config.OutgoingAuthConfig
}
func NewK8SBackendDiscoverer(
workloadsManager workloads.K8SManager,
groupsManager groups.Manager,
authConfig *config.OutgoingAuthConfig,
) BackendDiscoverer {
return &k8sBackendDiscoverer{
workloadsManager: workloadsManager,
groupsManager: groupsManager,
authConfig: authConfig,
}
}
func (d *k8sBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
// Works with k8s.Workload - fully type-safe, no assertions needed
workload, err := d.workloadsManager.GetWorkload(ctx, name)
// Convert k8s.Workload → vmcp.Backend
}Benefits:
- ✅ Type Safety: Each discoverer uses the correct interface and workload type
- ✅ No Runtime Checks: Factory selects once; no conditionals in discovery logic
- ✅ Clear Separation: Platform-specific code is isolated
- ✅ Consistent Pattern: Matches
pkg/groupsstructure (cli_manager.go+crd_manager.go) - ✅ Easier Testing: Each discoverer can be tested independently
- ✅ Platform-Specific Optimizations: Each can be optimized for its platform
Benefits
- Zero CLI Impact: All existing CLI code continues to work without modification
- No Runtime Coupling: Kubernetes implementation doesn't depend on
pkg/container/runtime - Domain-Specific Models: Kubernetes uses models that match MCPServer CRD concepts
- Preserve Kubernetes Features: Can expose MCPServer phases, conditions, annotations
- Clearer Separation: Platform-specific code is clearly separated
- Type Safety: Compile-time guarantees about which operations are available
- Easier Testing: Can test Kubernetes logic without container runtime mocks
- Backward Compatible: CLI code path remains unchanged
Migration Strategy
- Phase 1: Create
K8SManagerinterface andk8s.Workloadtype alongside the existingManagerinterface - Phase 2: Implement Kubernetes support using
K8SManagerinterface (no CLI changes) - Phase 3: Split discoverer into separate implementations:
- Create
cli_discoverer.go(moves existing logic, works withManager+core.Workload) - Create
k8s_discoverer.go(new implementation, works withK8SManager+k8s.Workload) - Update
discoverer.goto be a factory function (likegroups/manager.go)
- Create
- Phase 4: Update any Kubernetes-specific code to use
K8SManagerdirectly
Important:
- CLI code requires zero changes. All existing CLI commands, APIs, and internal code continue to use
workloads.NewManager(ctx)and getManagerinterface as they do today. - vmcp aggregator requires minimal changes - only the factory function needs to check runtime and call the appropriate
NewManager()orNewK8SManager(). - The
NewBackendDiscoverer()factory automatically selects the right discoverer based on the manager type passed to it.
Example: vmcp Backend Discovery
Usage (minimal changes in vmcp factory):
// cmd/vmcp/app/commands.go - Small change to handle both types
var workloadsManager interface{}
if rt.IsKubernetesRuntime() {
workloadsManager, err = workloads.NewK8SManager(ctx) // Returns K8SManager
} else {
workloadsManager, err = workloads.NewManager(ctx) // Returns Manager
}
if err != nil {
return fmt.Errorf("failed to create workloads manager: %w", err)
}
discoverer := aggregator.NewBackendDiscoverer(workloadsManager, groupsManager, cfg.OutgoingAuth)
backends, err := discoverer.Discover(ctx, cfg.Group)Internal Implementation:
// Factory automatically selects the right discoverer
func NewBackendDiscoverer(...) BackendDiscoverer {
if rt.IsKubernetesRuntime() {
return NewK8SBackendDiscoverer(...) // Returns k8sBackendDiscoverer
}
return NewCLIBackendDiscoverer(...) // Returns cliBackendDiscoverer
}
// CLI discoverer - type-safe with Manager interface
func (d *cliBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
workloadNames, err := d.workloadsManager.ListWorkloadsInGroup(ctx, groupRef)
for _, name := range workloadNames {
workload, err := d.workloadsManager.GetWorkload(ctx, name) // Returns core.Workload
backend := cliWorkloadToBackend(workload) // Direct conversion
backends = append(backends, backend)
}
}
// Kubernetes discoverer - type-safe with K8SManager interface
func (d *k8sBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
workloadNames, err := d.workloadsManager.ListWorkloadsInGroup(ctx, groupRef)
for _, name := range workloadNames {
workload, err := d.workloadsManager.GetWorkload(ctx, name) // Returns k8s.Workload
backend := k8sWorkloadToBackend(workload) // Direct conversion
backends = append(backends, backend)
}
}CLI Code Compatibility Guarantee
All existing CLI code in the codebase remains unchanged:
- ✅
cmd/thv/app/*.go- All CLI commands continue to useworkloads.NewManager(ctx)and getManagerinterface (existing behavior) - ✅
pkg/api/v1/*.go- API code continues to useManagerinterface (existing behavior) - ✅
pkg/runner/runner.go- Runner code continues to work withManagerinterface (existing behavior) - ✅ All code using
core.Workloadcontinues to work (existing behavior)
Only new/changed code:
- Kubernetes-specific code uses
K8SManagerinterface (new, separate interface) - vmcp aggregator uses separate discoverers (
cli_discoverer.goandk8s_discoverer.go) following thegroupspattern - Kubernetes implementation works directly with MCPServer CRDs without converting to
core.Workload
Recommendation
Proceed with separate interfaces approach. This provides:
- Better separation of concerns
- No artificial coupling between Kubernetes and runtime abstractions
- Ability to expose platform-specific features
- Clearer code organization
The migration can be done incrementally, and we can provide adapter functions where needed for backward compatibility.