Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fb159b1
unify workload management across CLI and Kubernetes
amirejaz Nov 6, 2025
480db30
removed unnecessary files
amirejaz Nov 6, 2025
0b09d38
unified workload with separate workloads for cli and k8s
amirejaz Nov 10, 2025
030368d
merged remote main branch into local
amirejaz Nov 11, 2025
068e2ce
refactor the constructor and fix tests
amirejaz Nov 11, 2025
c58ce7c
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 11, 2025
31e40e7
adds more tests
amirejaz Nov 11, 2025
2c921c7
fixed thv listing
amirejaz Nov 11, 2025
1d6d807
checks the kubernetes client runtime instead of the environment
amirejaz Nov 12, 2025
bec49a2
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 12, 2025
26a37ad
fix e2e tests
amirejaz Nov 12, 2025
9dd801e
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 12, 2025
f3fd09f
improves test coverage
amirejaz Nov 12, 2025
846fe38
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 12, 2025
b70ac65
refactor the k8s manager into separate package
amirejaz Nov 12, 2025
dd7c0ed
removed logs fns
amirejaz Nov 12, 2025
d121268
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 13, 2025
0608b7a
use pkg/k8s package for client and namespace
amirejaz Nov 13, 2025
ed6b6d9
moved discoverer creation to factory inside pkg
amirejaz Nov 14, 2025
152a921
Merge remote-tracking branch 'origin/main' into unified-workload-manager
amirejaz Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/stacklok/toolhive/pkg/vmcp/discovery"
vmcprouter "github.com/stacklok/toolhive/pkg/vmcp/router"
vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server"
"github.com/stacklok/toolhive/pkg/workloads"
)

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -225,31 +224,21 @@ func discoverBackends(ctx context.Context, cfg *config.Config) ([]vmcp.Backend,

// Initialize managers for backend discovery
logger.Info("Initializing workload and group managers")
workloadsManager, err := workloads.NewManager(ctx)
groupsManager, err := groups.NewManager()
if err != nil {
logger.Warnf("Failed to create workloads manager (expected in Kubernetes): %v", err)
logger.Warnf("Backend discovery will be skipped - continuing with empty backend list")
return []vmcp.Backend{}, backendClient, nil
return nil, nil, fmt.Errorf("failed to create groups manager: %w", err)
}

groupsManager, err := groups.NewManager()
// Create backend discoverer based on runtime environment
discoverer, err := aggregator.NewBackendDiscoverer(ctx, groupsManager, cfg.OutgoingAuth)
if err != nil {
logger.Warnf("Failed to create groups manager (expected in Kubernetes): %v", err)
logger.Warnf("Backend discovery will be skipped - continuing with empty backend list")
return []vmcp.Backend{}, backendClient, nil
return nil, nil, fmt.Errorf("failed to create backend discoverer: %w", err)
}

// Create backend discoverer and discover backends
discoverer := aggregator.NewCLIBackendDiscoverer(workloadsManager, groupsManager, cfg.OutgoingAuth)

logger.Infof("Discovering backends in group: %s", cfg.Group)
backends, err := discoverer.Discover(ctx, cfg.Group)
if err != nil {
// Handle discovery errors gracefully - this is expected in Kubernetes
logger.Warnf("CLI backend discovery failed (likely running in Kubernetes): %v", err)
logger.Warnf("Kubernetes backend discovery is not yet implemented - continuing with empty backend list")
logger.Warnf("The vmcp server will start but won't proxy any backends until this feature is implemented")
return []vmcp.Backend{}, backendClient, nil
return nil, nil, fmt.Errorf("failed to discover backends: %w", err)
}

if len(backends) == 0 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/vmcp/aggregator/cli_discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type cliBackendDiscoverer struct {
//
// The authConfig parameter configures authentication for discovered backends.
// If nil, backends will have no authentication configured.
//
// This is the CLI-specific constructor. For Kubernetes workloads, use NewK8SBackendDiscoverer.
func NewCLIBackendDiscoverer(
workloadsManager workloads.Manager,
groupsManager groups.Manager,
Expand Down
20 changes: 20 additions & 0 deletions pkg/vmcp/aggregator/cli_discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,24 @@ func TestCLIBackendDiscoverer_Discover(t *testing.T) {
require.Len(t, backends, 1)
assert.Equal(t, "good-workload", backends[0].ID)
})

t.Run("returns error when list workloads fails", func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockWorkloads := workloadmocks.NewMockManager(ctrl)
mockGroups := mocks.NewMockManager(ctrl)

mockGroups.EXPECT().Exists(gomock.Any(), testGroupName).Return(true, nil)
mockWorkloads.EXPECT().ListWorkloadsInGroup(gomock.Any(), testGroupName).
Return(nil, errors.New("failed to list workloads"))

discoverer := NewCLIBackendDiscoverer(mockWorkloads, mockGroups, nil)
backends, err := discoverer.Discover(context.Background(), testGroupName)

require.Error(t, err)
assert.Nil(t, backends)
assert.Contains(t, err.Error(), "failed to list workloads in group")
})
}
44 changes: 44 additions & 0 deletions pkg/vmcp/aggregator/discoverer_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package aggregator

import (
"context"
"fmt"

rt "github.com/stacklok/toolhive/pkg/container/runtime"
"github.com/stacklok/toolhive/pkg/groups"
"github.com/stacklok/toolhive/pkg/vmcp/config"
"github.com/stacklok/toolhive/pkg/workloads"
"github.com/stacklok/toolhive/pkg/workloads/k8s"
)

// NewBackendDiscoverer creates a BackendDiscoverer based on the runtime environment.
// It automatically detects whether to use CLI (Docker/Podman) or Kubernetes discoverer
// and creates the appropriate workloads manager.
//
// Parameters:
// - ctx: Context for creating managers
// - groupsManager: Manager for group operations (must already be initialized)
// - authConfig: Outgoing authentication configuration for discovered backends
//
// Returns:
// - BackendDiscoverer: The appropriate discoverer for the current runtime
// - error: If manager creation fails
func NewBackendDiscoverer(
ctx context.Context,
groupsManager groups.Manager,
authConfig *config.OutgoingAuthConfig,
) (BackendDiscoverer, error) {
if rt.IsKubernetesRuntime() {
k8sWorkloadsManager, err := k8s.NewManagerFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes workloads manager: %w", err)
}
return NewK8SBackendDiscoverer(k8sWorkloadsManager, groupsManager, authConfig), nil
}

cliWorkloadsManager, err := workloads.NewManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create CLI workloads manager: %w", err)
}
return NewCLIBackendDiscoverer(cliWorkloadsManager, groupsManager, authConfig), nil
}
163 changes: 145 additions & 18 deletions pkg/vmcp/aggregator/k8s_discoverer.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,160 @@
// Package aggregator provides platform-agnostic backend discovery.
// This file contains the Kubernetes-specific discoverer implementation.
package aggregator

import (
"context"
"fmt"

mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of the k8s discoverer and cli discoverer are very similar. This makes me wonder if we could have a single discoverer type, and create an interface which encapsulates the operation they perform on the underlying runtime. Let's chat through this if you have some time.

"github.com/stacklok/toolhive/pkg/groups"
"github.com/stacklok/toolhive/pkg/logger"
"github.com/stacklok/toolhive/pkg/vmcp"
"github.com/stacklok/toolhive/pkg/vmcp/config"
"github.com/stacklok/toolhive/pkg/workloads/k8s"
)

// k8sBackendDiscoverer discovers backend MCP servers from Kubernetes pods/services in a group.
// This is the Kubernetes version of BackendDiscoverer (not implemented yet).
// k8sBackendDiscoverer discovers backend MCP servers from Kubernetes workloads (MCPServer CRDs).
// It works with k8s.Manager and k8s.Workload.
type k8sBackendDiscoverer struct {
// TODO: Add Kubernetes client and group CRD interfaces
workloadsManager k8s.Manager
groupsManager groups.Manager
authConfig *config.OutgoingAuthConfig
}

// NewK8sBackendDiscoverer creates a new Kubernetes-based backend discoverer.
// It discovers workloads from Kubernetes MCPServer resources managed by the operator.
func NewK8sBackendDiscoverer() BackendDiscoverer {
return &k8sBackendDiscoverer{}
// NewK8SBackendDiscoverer creates a new Kubernetes-based backend discoverer.
// It discovers workloads from MCPServer CRDs managed by the ToolHive operator in Kubernetes.
//
// The authConfig parameter configures authentication for discovered backends.
// If nil, backends will have no authentication configured.
//
// This is the Kubernetes-specific constructor. For CLI workloads, use NewCLIBackendDiscoverer.
func NewK8SBackendDiscoverer(
workloadsManager k8s.Manager,
groupsManager groups.Manager,
authConfig *config.OutgoingAuthConfig,
) BackendDiscoverer {
return &k8sBackendDiscoverer{
workloadsManager: workloadsManager,
groupsManager: groupsManager,
authConfig: authConfig,
}
}

// Discover finds all backend workloads in the specified Kubernetes group.
// The groupRef is the MCPGroup name.
func (*k8sBackendDiscoverer) Discover(_ context.Context, _ string) ([]vmcp.Backend, error) {
// TODO: Implement Kubernetes backend discovery
// 1. Query MCPGroup CRD by name
// 2. List MCPServer resources with matching group label
// 3. Filter for ready/running MCPServers
// 4. Build service URLs (http://service-name.namespace.svc.cluster.local:port)
// 5. Extract transport type from MCPServer spec
// 6. Return vmcp.Backend list
return nil, fmt.Errorf("kubernetes backend discovery not yet implemented")
// Discover finds all backend workloads in the specified group.
func (d *k8sBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
logger.Infof("Discovering Kubernetes backends in group %s", groupRef)

// Verify that the group exists
exists, err := d.groupsManager.Exists(ctx, groupRef)
if err != nil {
return nil, fmt.Errorf("failed to check if group exists: %w", err)
}
if !exists {
return nil, fmt.Errorf("group %s not found", groupRef)
}

// Get all workload names in the group
workloadNames, err := d.workloadsManager.ListWorkloadsInGroup(ctx, groupRef)
if err != nil {
return nil, fmt.Errorf("failed to list workloads in group: %w", err)
}

if len(workloadNames) == 0 {
logger.Infof("No workloads found in group %s", groupRef)
return []vmcp.Backend{}, nil
}

logger.Debugf("Found %d workloads in group %s, discovering backends", len(workloadNames), groupRef)

// Query each workload and convert to backend
var backends []vmcp.Backend
for _, name := range workloadNames {
workload, err := d.workloadsManager.GetWorkload(ctx, name)
if err != nil {
logger.Warnf("Failed to get workload %s: %v, skipping", name, err)
continue
}

backend := d.convertK8SWorkload(workload, groupRef)
if backend != nil {
backends = append(backends, *backend)
}
}

if len(backends) == 0 {
logger.Infof("No accessible backends found in group %s (all workloads lack URLs)", groupRef)
return []vmcp.Backend{}, nil
}

logger.Infof("Discovered %d backends in group %s", len(backends), groupRef)
return backends, nil
}

// convertK8SWorkload converts a k8s.Workload to a vmcp.Backend.
func (d *k8sBackendDiscoverer) convertK8SWorkload(workload k8s.Workload, groupRef string) *vmcp.Backend {
// Skip workloads without a URL (not accessible)
if workload.URL == "" {
logger.Debugf("Skipping workload %s without URL", workload.Name)
return nil
}

// Map workload phase to backend health status
healthStatus := mapK8SWorkloadPhaseToHealth(workload.Phase)

// Convert k8s.Workload to vmcp.Backend
transportType := workload.ProxyMode
if transportType == "" {
// Fallback to TransportType if ProxyMode is not set (for direct transports)
transportType = workload.TransportType.String()
}

backend := vmcp.Backend{
ID: workload.Name,
Name: workload.Name,
BaseURL: workload.URL,
TransportType: transportType,
HealthStatus: healthStatus,
Metadata: make(map[string]string),
}

// Apply authentication configuration if provided
authStrategy, authMetadata := d.authConfig.ResolveForBackend(workload.Name)
backend.AuthStrategy = authStrategy
backend.AuthMetadata = authMetadata
if authStrategy != "" {
logger.Debugf("Backend %s configured with auth strategy: %s", workload.Name, authStrategy)
}

// Copy user labels to metadata first
for k, v := range workload.Labels {
backend.Metadata[k] = v
}

// Set system metadata (these override user labels to prevent conflicts)
backend.Metadata["group"] = groupRef
backend.Metadata["tool_type"] = workload.ToolType
backend.Metadata["workload_phase"] = string(workload.Phase)
backend.Metadata["namespace"] = workload.Namespace

logger.Debugf("Discovered backend %s: %s (%s) with health status %s",
backend.ID, backend.BaseURL, backend.TransportType, backend.HealthStatus)

return &backend
}

// mapK8SWorkloadPhaseToHealth converts a MCPServerPhase to a backend health status.
func mapK8SWorkloadPhaseToHealth(phase mcpv1alpha1.MCPServerPhase) vmcp.BackendHealthStatus {
switch phase {
case mcpv1alpha1.MCPServerPhaseRunning:
return vmcp.BackendHealthy
case mcpv1alpha1.MCPServerPhaseFailed:
return vmcp.BackendUnhealthy
case mcpv1alpha1.MCPServerPhaseTerminating:
return vmcp.BackendUnhealthy
case mcpv1alpha1.MCPServerPhasePending:
return vmcp.BackendUnknown
default:
return vmcp.BackendUnknown
}
}
Loading
Loading