Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions internal/kgateway/extensions2/plugins/backend/ai/ai_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"maps"
"os"

envoyclusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoycorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoyendpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoyroutev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
envoy_ext_proc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -28,6 +31,16 @@ type IR struct {
AIMultiSecret map[string]*ir.Secret
Transformation *envoytransformation.RouteTransformations
Extproc *envoy_ext_proc_v3.ExtProcPerRoute
// Pre-computed cluster configuration
ClusterConfig *AIClusterConfig
}

// AIClusterConfig holds pre-computed cluster configuration for AI backends.
type AIClusterConfig struct {
ClusterDiscoveryType *envoyclusterv3.Cluster_Type
TransportSocketMatches []*envoyclusterv3.Cluster_TransportSocketMatch
LoadAssignment *envoyendpointv3.ClusterLoadAssignment
HttpFilters []*envoy_hcm.HttpFilter
}

func (i *IR) Equals(otherAIIr *IR) bool {
Expand Down Expand Up @@ -55,7 +68,51 @@ func (i *IR) Equals(otherAIIr *IR) bool {
if !proto.Equal(i.Transformation, otherAIIr.Transformation) {
return false
}
if !i.ClusterConfig.Equals(otherAIIr.ClusterConfig) {
return false
}
}
return true
}

func (c *AIClusterConfig) Equals(other *AIClusterConfig) bool {
if c == nil || other == nil {
return c == nil && other == nil
}

// Compare ClusterDiscoveryType
if (c.ClusterDiscoveryType == nil) != (other.ClusterDiscoveryType == nil) {
return false
}
if c.ClusterDiscoveryType != nil && c.ClusterDiscoveryType.Type != other.ClusterDiscoveryType.Type {
return false
}

// Compare TransportSocketMatches
if len(c.TransportSocketMatches) != len(other.TransportSocketMatches) {
return false
}
for i := range c.TransportSocketMatches {
if !proto.Equal(c.TransportSocketMatches[i], other.TransportSocketMatches[i]) {
return false
}
}

// Compare LoadAssignment
if !proto.Equal(c.LoadAssignment, other.LoadAssignment) {
return false
}

// Compare HttpFilters
if len(c.HttpFilters) != len(other.HttpFilters) {
return false
}
for i := range c.HttpFilters {
if !proto.Equal(c.HttpFilters[i], other.HttpFilters[i]) {
return false
}
}

return true
}

Expand Down Expand Up @@ -93,13 +150,17 @@ func ApplyAIBackend(ir *IR, pCtx *ir.RouteBackendContext, out *envoyroutev3.Rout
return nil
}

func PreprocessAIBackend(ctx context.Context, aiBackend *v1alpha1.AIBackend, ir *IR) error {
func PreprocessAIBackend(ctx context.Context, aiBackend *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecrets map[string]*ir.Secret, aiIr *IR) error {
if aiBackend == nil {
return nil
}

// Setup ext-proc route filter config, we will conditionally modify it based on certain route options.
// A heavily used part of this config is the `GrpcInitialMetadata`.
// This is used to add headers to the ext-proc request.
// These headers are used to configure the AI server on a per-request basis.
// This was the best available way to pass per-route configuration to the AI server.
extProcRouteSettings := ir.Extproc
extProcRouteSettings := aiIr.Extproc
if extProcRouteSettings == nil {
extProcRouteSettings = &envoy_ext_proc_v3.ExtProcPerRoute{
Override: &envoy_ext_proc_v3.ExtProcPerRoute_Overrides{
Expand Down Expand Up @@ -151,7 +212,7 @@ func PreprocessAIBackend(ctx context.Context, aiBackend *v1alpha1.AIBackend, ir
Transformations: []*envoytransformation.RouteTransformations_RouteTransformation{routeTransformation},
}
// Store transformations in IR
ir.Transformation = transformations
aiIr.Transformation = transformations

extProcRouteSettings.GetOverrides().GrpcInitialMetadata = append(extProcRouteSettings.GetOverrides().GetGrpcInitialMetadata(),
&envoycorev3.HeaderValue{
Expand Down Expand Up @@ -180,7 +241,19 @@ func PreprocessAIBackend(ctx context.Context, aiBackend *v1alpha1.AIBackend, ir
)

// Store extproc settings in IR
ir.Extproc = extProcRouteSettings
aiIr.Extproc = extProcRouteSettings

// Build cluster configuration
clusterConfig, err := buildAIClusterConfig(aiBackend, aiSecret, multiSecrets)
if err != nil {
return err
}
filter, err := buildUpstreamClusterHttpFilters()
if err != nil {
return err
}
clusterConfig.HttpFilters = filter
aiIr.ClusterConfig = clusterConfig

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestApplyAIBackend(t *testing.T) {
}
tt.pCtx = pCtx
aiIR := &IR{}
err := PreprocessAIBackend(context.Background(), tt.aiBackend, aiIR)
err := PreprocessAIBackend(context.Background(), tt.aiBackend, nil, nil, aiIR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should extend this test to pass secret and multiSecret, but can be a follow-up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets take it as a followup

if tt.expectedError != "" && err == nil {
t.Errorf("expected error but got nil")
} else if tt.expectedError == "" && err != nil {
Expand Down
22 changes: 11 additions & 11 deletions internal/kgateway/extensions2/plugins/backend/ai/ai_httpfilters.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ const (
upstreamCodecFilterName = "envoy.filters.http.upstream_codec"
)

func AddUpstreamClusterHttpFilters(out *envoyclusterv3.Cluster) error {
func buildUpstreamClusterHttpFilters() ([]*envoy_hcm.HttpFilter, error) {
transformationMsg, err := utils.MessageToAny(&envoytransformation.FilterTransformations{})
if err != nil {
return err
return nil, err
}

upstreamWaitMsg, err := utils.MessageToAny(&upstream_wait.UpstreamWaitFilterConfig{})
if err != nil {
return err
return nil, err
}

codecConfigAny, err := utils.MessageToAny(&envoy_upstream_codec.UpstreamCodec{})
if err != nil {
return fmt.Errorf("failed to create upstream codec config: %v", err)
return nil, fmt.Errorf("failed to create upstream codec config: %v", err)
}

// The order of the filters is important as AIPolicyTransformationFilterName must run before the AIBackendTransformationFilterName
Expand Down Expand Up @@ -74,7 +74,11 @@ func AddUpstreamClusterHttpFilters(out *envoyclusterv3.Cluster) error {
},
}

if err = translatorutils.MutateHttpOptions(out, func(opts *envoy_upstreams_v3.HttpProtocolOptions) {
return orderedFilters, nil
}

func AddUpstreamClusterHttpFilters(filters []*envoy_hcm.HttpFilter, out *envoyclusterv3.Cluster) error {
return translatorutils.MutateHttpOptions(out, func(opts *envoy_upstreams_v3.HttpProtocolOptions) {
ts := out.GetTransportSocket()
supportsALPN := false
if ts != nil {
Expand All @@ -101,12 +105,8 @@ func AddUpstreamClusterHttpFilters(out *envoyclusterv3.Cluster) error {
Seconds: 30,
},
}
opts.HttpFilters = append(opts.GetHttpFilters(), orderedFilters...)
}); err != nil {
return err
}

return nil
opts.HttpFilters = append(opts.GetHttpFilters(), filters...)
})
}

func AddExtprocHTTPFilter() ([]filters.StagedHttpFilter, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
envoytlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
envoytransformation "github.com/solo-io/envoy-gloo/go/config/filter/http/transformation/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -38,39 +39,38 @@ func tlsMatch() *structpb.Struct {
}
}

func ProcessAIBackend(in *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecrets map[string]*ir.Secret, out *envoyclusterv3.Cluster) error {
// buildAIClusterConfig pre-computes the cluster configuration for an AI backend.
// This function is used by the `PreprocessAIBackend` function to build the cluster configuration for the AI backend.
// It is ALSO used by `ProcessRoute` to create the cluster in the event of backup models being used
// and fallbacks being required.
func buildAIClusterConfig(in *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecrets map[string]*ir.Secret) (*AIClusterConfig, error) {
if in == nil {
return nil
return nil, nil
}

return buildModelCluster(in, aiSecret, multiSecrets, out)
}

// buildModelCluster builds a cluster for the given AI backend.
// This function is used by the `ProcessBackend` function to build the cluster for the AI backend.
// It is ALSO used by `ProcessRoute` to create the cluster in the event of backup models being used
// and fallbacks being required.
func buildModelCluster(aiUs *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecrets map[string]*ir.Secret, out *envoyclusterv3.Cluster) error {
// set the type to strict dns to support mutli pool backends
out.ClusterDiscoveryType = &envoyclusterv3.Cluster_Type{
Type: envoyclusterv3.Cluster_STRICT_DNS,
config := &AIClusterConfig{
ClusterDiscoveryType: &envoyclusterv3.Cluster_Type{
Type: envoyclusterv3.Cluster_STRICT_DNS,
},
}

// We are reliant on https://github.com/envoyproxy/envoy/pull/34154 to merge
// before we can do OutlierDetection on 429s here
// out.OutlierDetection = getOutlierDetectionConfig(aiUs)

// Build endpoints
var prioritized []*envoyendpointv3.LocalityLbEndpoints
var err error
if aiUs.LLM != nil {
prioritized, err = buildLLMEndpoint(aiUs, aiSecret)
if in.LLM != nil {
prioritized, err = buildLLMEndpoint(in, aiSecret)
if err != nil {
return err
return nil, err
}
} else {
epByType := map[string]struct{}{}
prioritized = make([]*envoyendpointv3.LocalityLbEndpoints, 0, len(aiUs.PriorityGroups))
for idx, group := range aiUs.PriorityGroups {
prioritized = make([]*envoyendpointv3.LocalityLbEndpoints, 0, len(in.PriorityGroups))
for idx, group := range in.PriorityGroups {
eps := make([]*envoyendpointv3.LbEndpoint, 0, len(group.Providers))
for jdx, ep := range group.Providers {
var result *envoyendpointv3.LbEndpoint
Expand Down Expand Up @@ -116,7 +116,7 @@ func buildModelCluster(aiUs *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecre
slog.Error("bedrock on the AI backend are not supported yet, switch to agentgateway class")
}
if err != nil {
return err
return nil, err
}
eps = append(eps, result)
}
Expand All @@ -127,7 +127,7 @@ func buildModelCluster(aiUs *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecre
})
}
if len(epByType) > 1 {
return fmt.Errorf("multi backend pools must all be of the same type, got %v", epByType)
return nil, fmt.Errorf("multi backend pools must all be of the same type, got %v", epByType)
}
}

Expand All @@ -139,9 +139,9 @@ func buildModelCluster(aiUs *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecre
}
tlsCtxAny, err := utils.MessageToAny(tlsCtx)
if err != nil {
return err
return nil, err
}
out.TransportSocketMatches = append(out.GetTransportSocketMatches(), []*envoyclusterv3.Cluster_TransportSocketMatch{
config.TransportSocketMatches = []*envoyclusterv3.Cluster_TransportSocketMatch{
{
Name: "tls",
TransportSocket: &envoycorev3.TransportSocket{
Expand All @@ -164,13 +164,29 @@ func buildModelCluster(aiUs *v1alpha1.AIBackend, aiSecret *ir.Secret, multiSecre
},
Match: &structpb.Struct{},
},
}...)
out.LoadAssignment = &envoyendpointv3.ClusterLoadAssignment{
ClusterName: out.GetName(),
Endpoints: prioritized,
}

return nil
config.LoadAssignment = &envoyendpointv3.ClusterLoadAssignment{
// ClusterName will be set at translation time.
Endpoints: prioritized,
}

return config, nil
}

// ProcessAI applies the pre-computed AI cluster config to the envoy cluster.
func ProcessAI(config *AIClusterConfig, out *envoyclusterv3.Cluster) {
// Apply cluster discovery type
out.ClusterDiscoveryType = config.ClusterDiscoveryType

// Apply transport socket matches
out.TransportSocketMatches = config.TransportSocketMatches

if config.LoadAssignment != nil {
// clone needed to avoid adding cluster name to original object in the IR.
out.LoadAssignment = proto.Clone(config.LoadAssignment).(*envoyendpointv3.ClusterLoadAssignment)
out.LoadAssignment.ClusterName = out.GetName()
}
}

func buildLLMEndpoint(aiUs *v1alpha1.AIBackend, aiSecrets *ir.Secret) ([]*envoyendpointv3.LocalityLbEndpoints, error) {
Expand Down
Loading