Skip to content

Commit ca025b3

Browse files
author
Sherif Akoush
authored
fix: Add server keep alive enforcement policy (#6016)
* add server keep alove policy constants * fix spelling mistakes in constant * remove keep alive from agent->server * scheduler server kaep * use the same constants for client and server keep alive settings * add server (agent) keap * add dataflow server kaep * set keepalive permits to false * add kotlin client keap * set operator client keap * add note * adjust path to file * tidy up keepalive parameters * make constants private * add controller client exp backoff * tidy up client exp backoff * only output log when cleaning resources to reduce log spam * tidy up imports * extract parameters as constants * remove specific grpc operator * set grpc level retries to 100 * set grpc level retries to 100 * add envoy keepalive settings * extra backoff parameters as const * fix lint
1 parent 294b5f8 commit ca025b3

File tree

21 files changed

+141
-89
lines changed

21 files changed

+141
-89
lines changed

operator/scheduler/client.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,16 @@ import (
3333

3434
const (
3535
// these 2 constants in combination with the backoff exponential function will give us a max backoff of 13.5 minutes
36-
SchedulerConnectMaxRetries = 12
37-
SchedulerConnectBackoffScalar = 200 * time.Millisecond
38-
ClientKeapAliveTime = 60 * time.Second
39-
ClientKeapAliveTimeout = 2 * time.Second
40-
ClientKeapAlivePermit = true
36+
schedulerConnectMaxRetries = 100
37+
schedulerConnectBackoffScalar = 200 * time.Millisecond
38+
// these keep alive settings need to match the scheduler counterpart in scheduler/pkg/util/constants.go
39+
clientKeepAliveTime = 60 * time.Second
40+
clientKeepAliveTimeout = 2 * time.Second
41+
clientKeepAlivePermit = false
42+
// backoff
43+
backoffMaxElapsedTime = 0 // Never stop due to large time between calls
44+
backOffMaxInterval = time.Second * 15
45+
backOffInitialInterval = time.Second
4146
)
4247

4348
type SchedulerClient struct {
@@ -229,9 +234,9 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
229234
}
230235
}
231236
kacp := keepalive.ClientParameters{
232-
Time: ClientKeapAliveTime,
233-
Timeout: ClientKeapAliveTimeout,
234-
PermitWithoutStream: ClientKeapAlivePermit,
237+
Time: clientKeepAliveTime,
238+
Timeout: clientKeepAliveTimeout,
239+
PermitWithoutStream: clientKeepAlivePermit,
235240
}
236241

237242
retryOpts := []grpc_retry.CallOption{
@@ -249,7 +254,7 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
249254
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
250255
s.logger.Info("Running scheduler client in plain text mode", "port", port)
251256
}
252-
opts = append(opts, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)))
257+
// we dont have backoff retry on the grpc streams as we handle this in the event handlers
253258
opts = append(opts, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)))
254259
opts = append(opts, grpc.WithKeepaliveParams(kacp))
255260
s.logger.Info("Dialing scheduler", "host", host, "port", port)
@@ -313,7 +318,7 @@ func retryFn(
313318
logFailure := func(err error, delay time.Duration) {
314319
logger.Error(err, "Scheduler not ready")
315320
}
316-
backOffExp := backoff.NewExponentialBackOff()
321+
backOffExp := getClientExponentialBackoff()
317322
fnWithArgs := func() error {
318323
grpcClient := scheduler.NewSchedulerClient(conn)
319324
return fn(context.Background(), grpcClient, namespace)
@@ -325,3 +330,11 @@ func retryFn(
325330
}
326331
return nil
327332
}
333+
334+
func getClientExponentialBackoff() *backoff.ExponentialBackOff {
335+
backOffExp := backoff.NewExponentialBackOff()
336+
backOffExp.MaxElapsedTime = backoffMaxElapsedTime
337+
backOffExp.MaxInterval = backOffMaxInterval
338+
backOffExp.InitialInterval = backOffInitialInterval
339+
return backOffExp
340+
}

operator/scheduler/control_plane.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC
3131
stream, err := grpcClient.SubscribeControlPlane(
3232
ctx,
3333
&scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"},
34-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
35-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
34+
grpc_retry.WithMax(schedulerConnectMaxRetries),
35+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
3636
)
3737
if err != nil {
3838
return err

operator/scheduler/experiment.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alp
4343
_, err = grpcClient.StartExperiment(
4444
ctx,
4545
req,
46-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
47-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
46+
grpc_retry.WithMax(schedulerConnectMaxRetries),
47+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
4848
)
4949
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
5050
}
@@ -66,8 +66,8 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph
6666
_, err = grpcClient.StopExperiment(
6767
ctx,
6868
req,
69-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
70-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
69+
grpc_retry.WithMax(schedulerConnectMaxRetries),
70+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
7171
)
7272
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
7373
}
@@ -79,8 +79,8 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli
7979
stream, err := grpcClient.SubscribeExperimentStatus(
8080
ctx,
8181
&scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"},
82-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
83-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
82+
grpc_retry.WithMax(schedulerConnectMaxRetries),
83+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
8484
)
8585
if err != nil {
8686
return err

operator/scheduler/model.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model,
6060
_, err = grpcClient.LoadModel(
6161
ctx,
6262
&loadModelRequest,
63-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
64-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
63+
grpc_retry.WithMax(schedulerConnectMaxRetries),
64+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
6565
)
6666
if err != nil {
6767
return s.checkErrorRetryable(model.Kind, model.Name, err), err
@@ -102,8 +102,8 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model
102102
_, err = grpcClient.UnloadModel(
103103
ctx,
104104
modelRef,
105-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
106-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
105+
grpc_retry.WithMax(schedulerConnectMaxRetries),
106+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
107107
)
108108
if err != nil {
109109
return s.checkErrorRetryable(model.Kind, model.Name, err), err
@@ -117,8 +117,8 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s
117117
stream, err := grpcClient.SubscribeModelStatus(
118118
ctx,
119119
&scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"},
120-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
121-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
120+
grpc_retry.WithMax(schedulerConnectMaxRetries),
121+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
122122
)
123123
if err != nil {
124124
return err

operator/scheduler/pipeline.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.P
4242
_, err = grpcClient.LoadPipeline(
4343
ctx,
4444
&req,
45-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
46-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
45+
grpc_retry.WithMax(schedulerConnectMaxRetries),
46+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
4747
)
4848
return s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err), err
4949
}
@@ -62,8 +62,8 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1
6262
_, err = grpcClient.UnloadPipeline(
6363
ctx,
6464
&req,
65-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
66-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
65+
grpc_retry.WithMax(schedulerConnectMaxRetries),
66+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
6767
)
6868
if err != nil {
6969
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
@@ -85,8 +85,8 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClien
8585
stream, err := grpcClient.SubscribePipelineStatus(
8686
ctx,
8787
&scheduler.PipelineSubscriptionRequest{SubscriberName: "seldon manager"},
88-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
89-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
88+
grpc_retry.WithMax(schedulerConnectMaxRetries),
89+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
9090
)
9191
if err != nil {
9292
return err

operator/scheduler/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler
6464
_, err := grpcClient.ServerNotify(
6565
ctx,
6666
request,
67-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
68-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
67+
grpc_retry.WithMax(schedulerConnectMaxRetries),
68+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
6969
)
7070
if err != nil {
7171
logger.Error(err, "Failed to send notify server to scheduler")
@@ -82,8 +82,8 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient
8282
stream, err := grpcClient.SubscribeServerStatus(
8383
ctx,
8484
&scheduler.ServerSubscriptionRequest{SubscriberName: "seldon manager"},
85-
grpc_retry.WithMax(SchedulerConnectMaxRetries),
86-
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
85+
grpc_retry.WithMax(schedulerConnectMaxRetries),
86+
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
8787
)
8888
if err != nil {
8989
return err

scheduler/config/envoy.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ static_resources:
1111
socket_address:
1212
address: seldon-scheduler
1313
port_value: 9002
14-
http2_protocol_options: {}
14+
http2_protocol_options: {
15+
connection_keepalive: {
16+
interval: 60s,
17+
timeout: 2s,
18+
}
19+
}
1520
name: xds_cluster
1621
- connect_timeout: 0.250s
1722
type: LOGICAL_DNS

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import kotlinx.coroutines.flow.onCompletion
3737
import kotlinx.coroutines.flow.onEach
3838
import kotlinx.coroutines.runBlocking
3939
import java.util.concurrent.ConcurrentHashMap
40+
import java.util.concurrent.TimeUnit
4041
import io.klogging.logger as coLogger
4142

4243
@OptIn(FlowPreview::class)
@@ -60,6 +61,9 @@ class PipelineSubscriber(
6061
.defaultServiceConfig(grpcServiceConfig)
6162
.usePlaintext() // Use TLS
6263
.enableRetry()
64+
// these keep alive settings need to match the go counterpart in scheduler/pkg/util/constants.go
65+
.keepAliveTime(60L, TimeUnit.SECONDS)
66+
.keepAliveTimeout(2L, TimeUnit.SECONDS)
6367
.build()
6468
private val client = ChainerGrpcKt.ChainerCoroutineStub(channel)
6569

scheduler/pkg/agent/client.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"google.golang.org/grpc"
2525
"google.golang.org/grpc/credentials"
2626
"google.golang.org/grpc/credentials/insecure"
27-
"google.golang.org/grpc/keepalive"
2827
"google.golang.org/protobuf/encoding/protojson"
2928

3029
"github.com/seldonio/seldon-core/apis/go/v2/mlops/agent"
@@ -237,8 +236,7 @@ func (c *Client) Start() error {
237236
logFailure := func(err error, delay time.Duration) {
238237
c.logger.WithError(err).Errorf("Scheduler not ready")
239238
}
240-
backOffExp := backoff.NewExponentialBackOff()
241-
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
239+
backOffExp := util.GetClientExponentialBackoff()
242240
err := backoff.RetryNotify(c.StartService, backOffExp, logFailure)
243241
if err != nil {
244242
c.logger.WithError(err).Fatal("Failed to start client")
@@ -417,11 +415,7 @@ func (c *Client) getConnection(host string, plainTxtPort int, tlsPort int) (*grp
417415

418416
logger.Infof("Connecting (non-blocking) to scheduler at %s:%d", host, port)
419417

420-
kacp := keepalive.ClientParameters{
421-
Time: util.ClientKeapAliveTime,
422-
Timeout: util.ClientKeapAliveTimeout,
423-
PermitWithoutStream: util.ClientKeapAlivePermit,
424-
}
418+
kacp := util.GetClientKeepAliveParameters()
425419

426420
opts := []grpc.DialOption{
427421
grpc.WithTransportCredentials(transCreds),
@@ -453,7 +447,7 @@ func (c *Client) StartService() error {
453447
Shared: true,
454448
AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(),
455449
},
456-
grpc_retry.WithMax(1),
450+
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
457451
) // TODO make configurable
458452
if err != nil {
459453
return err

scheduler/pkg/agent/modelserver_controlplane/oip/v2.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
2020
"google.golang.org/grpc"
2121
"google.golang.org/grpc/credentials/insecure"
22-
"google.golang.org/grpc/keepalive"
2322
"google.golang.org/grpc/status"
2423

2524
v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane"
@@ -51,19 +50,12 @@ func CreateV2GrpcConnection(v2Config V2Config) (*grpc.ClientConn, error) {
5150
grpc_retry.WithMax(v2Config.GRPRetryMaxCount),
5251
}
5352

54-
kacp := keepalive.ClientParameters{
55-
Time: util.ClientKeapAliveTime,
56-
Timeout: util.ClientKeapAliveTimeout,
57-
PermitWithoutStream: util.ClientKeapAlivePermit,
58-
}
59-
6053
opts := []grpc.DialOption{
6154
grpc.WithTransportCredentials(insecure.NewCredentials()),
6255
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(v2Config.GRPCMaxMsgSizeBytes), grpc.MaxCallSendMsgSize(v2Config.GRPCMaxMsgSizeBytes)),
6356
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)),
6457
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)),
6558
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
66-
grpc.WithKeepaliveParams(kacp),
6759
}
6860
conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", v2Config.Host, v2Config.GRPCPort), opts...)
6961
if err != nil {

0 commit comments

Comments
 (0)