Skip to content

fix(scheduler): Scheduler wait on server connect #5930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f74b510
start envoy xDS server last
Sep 13, 2024
cf951b8
orginise starter cmd for scheduler
Sep 13, 2024
c0dba90
Add synchroniser interface and simple timerbased impl
Sep 13, 2024
910bd6d
seperate out logic when agent connects
Sep 13, 2024
466234b
integrate simple synchroniser in code
Sep 13, 2024
0ec9e99
add first sync option to ServerNotify
Sep 13, 2024
e18a89e
Adjust controller to set isFirstSync
Sep 13, 2024
911e1ff
allow servernotify initial sync to start the sync process in scheduler
Sep 13, 2024
b49389c
rename start to signal in synchroniser
Sep 16, 2024
5acae2f
fix test
Sep 16, 2024
7d05519
add test for ServerNotify
Sep 16, 2024
a844fa5
change interface to allow for number of signals to wait for
Sep 16, 2024
289ad9e
Changes from #5886 (to include server events)
Sep 16, 2024
5994b9a
add testing for servers (and other events) in hub
Sep 17, 2024
29f69b1
add test for AddServerReplica
Sep 17, 2024
f332a6a
add server sync impl
Sep 17, 2024
f08d21b
Add test for server sync
Sep 18, 2024
80fbf62
add more testing coverage
Sep 18, 2024
9ac48a8
Add logging
Sep 18, 2024
72c6058
Wireup server changes in starter cmd
Sep 18, 2024
9b997d0
Add extra logging
Sep 18, 2024
564d1cc
Skip logging if not required
Sep 18, 2024
9f8676e
Start timer from the begining.
Sep 18, 2024
e87a7d0
tidy up logic and add more tests
Sep 19, 2024
e4d1d34
wire up simple sync for the non k8s case
Sep 19, 2024
f27bb4e
use waut group instead of a channel for sync
Sep 23, 2024
3de7efc
tidy up log messages
Sep 23, 2024
d37e187
fint fixes
Sep 23, 2024
c4bb39b
set default timeout for scheduler readiness in docker-compose setup
Sep 23, 2024
98522fe
add explicit envar for scheduler ready timeout (compose)
Sep 23, 2024
27e0fe8
fix lint
Sep 23, 2024
c196324
fix test
Sep 23, 2024
f33d980
add architecture design at the start of the file for server sync
Sep 24, 2024
c1abd3b
Add new line
Sep 24, 2024
ecc8eec
tidy up name in test
Sep 24, 2024
995d78a
Add parametrisation for helm for SCHEDULER_READY_TIMEOUT_SECONDS
Sep 24, 2024
a329f91
add log message for variable
Sep 24, 2024
4ca58df
add note why xDS starts last
Sep 24, 2024
92bbf64
add extra wait for routes to be established.
Sep 24, 2024
3e31279
tidy up event hub code
Sep 24, 2024
2f66806
tidy up event handling code
Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
840 changes: 425 additions & 415 deletions apis/go/mlops/scheduler/scheduler.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions apis/mlops/scheduler/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ message ModelStatusRequest {

message ServerNotifyRequest {
repeated ServerNotify servers = 1;
bool isFirstSync = 2;
}

message ServerNotify {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ spec:
- --db-path=/mnt/scheduler/db
- --allow-plaintxt=$(ALLOW_PLAINTXT)
- --kafka-config-path=/mnt/kafka/kafka.json
- --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -949,6 +950,8 @@ spec:
value: '{{ .Values.security.envoy.ssl.downstream.server.caPath }}'
- name: ENVOY_DOWNSTREAM_CLIENT_TLS_CA_LOCATION
value: '{{ .Values.security.envoy.ssl.downstream.server.clientCaPath }}'
- name: SCHEDULER_READY_TIMEOUT_SECONDS
value: '{{ .Values.scheduler.schedulerReadyTimeoutSeconds }}'
- name: ALLOW_PLAINTXT
value: "true"
- name: POD_NAMESPACE
Expand Down
1 change: 1 addition & 0 deletions k8s/helm-charts/seldon-core-v2-setup/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ scheduler:
runAsUser: 1000
runAsGroup: 1000
runAsNonRoot: true
schedulerReadyTimeoutSeconds: 600

serverConfig:
terminationGracePeriodSeconds: 120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ scheduler:
runAsUser: 1000
runAsGroup: 1000
runAsNonRoot: true
schedulerReadyTimeoutSeconds: 600

serverConfig:
terminationGracePeriodSeconds: 120
Expand Down
2 changes: 2 additions & 0 deletions k8s/kustomize/helm-components-sc/patch_scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ spec:
value: '{{ .Values.security.envoy.ssl.downstream.server.caPath }}'
- name: ENVOY_DOWNSTREAM_CLIENT_TLS_CA_LOCATION
value: '{{ .Values.security.envoy.ssl.downstream.server.clientCaPath }}'
- name: SCHEDULER_READY_TIMEOUT_SECONDS
value: '{{ .Values.scheduler.schedulerReadyTimeoutSeconds }}'
volumeClaimTemplates:
- name: scheduler-state
spec:
Expand Down
3 changes: 3 additions & 0 deletions k8s/yaml/components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ spec:
- --db-path=/mnt/scheduler/db
- --allow-plaintxt=$(ALLOW_PLAINTXT)
- --kafka-config-path=/mnt/kafka/kafka.json
- --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -590,6 +591,8 @@ spec:
value: '/tmp/certs/dds/ca.crt'
- name: ENVOY_DOWNSTREAM_CLIENT_TLS_CA_LOCATION
value: '/tmp/certs/ddc/ca.crt'
- name: SCHEDULER_READY_TIMEOUT_SECONDS
value: '600'
- name: ALLOW_PLAINTXT
value: "true"
- name: POD_NAMESPACE
Expand Down
3 changes: 3 additions & 0 deletions operator/config/seldonconfigs/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,14 @@ spec:
- --db-path=/mnt/scheduler/db
- --allow-plaintxt=$(ALLOW_PLAINTXT)
- --kafka-config-path=/mnt/kafka/kafka.json
- --scheduler-ready-timeout-seconds=$(SCHEDULER_READY_TIMEOUT_SECONDS)
command:
- /bin/scheduler
env:
- name: ALLOW_PLAINTXT
value: "true"
- name: SCHEDULER_READY_TIMEOUT_SECONDS
value: 600
- name: POD_NAMESPACE
valueFrom:
fieldRef:
Expand Down
2 changes: 1 addition & 1 deletion operator/controllers/mlops/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, nil
}

err := r.Scheduler.ServerNotify(ctx, nil, []v1alpha1.Server{*server})
err := r.Scheduler.ServerNotify(ctx, nil, []v1alpha1.Server{*server}, false)
if err != nil {
r.updateStatusFromError(ctx, logger, server, err)
return reconcile.Result{}, err
Expand Down
5 changes: 3 additions & 2 deletions operator/scheduler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
)

func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler.SchedulerClient, servers []v1alpha1.Server) error {
func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler.SchedulerClient, servers []v1alpha1.Server, isFirstSync bool) error {
logger := s.logger.WithName("NotifyServer")
if len(servers) == 0 {
return nil
Expand Down Expand Up @@ -61,7 +61,8 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler
})
}
request := &scheduler.ServerNotifyRequest{
Servers: requests,
Servers: requests,
IsFirstSync: isFirstSync,
}
_, err := grpcClient.ServerNotify(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion operator/scheduler/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestServerNotify(t *testing.T) {
requests_servers: []*scheduler.ServerNotify{},
}
controller := newMockControllerClient()
err := controller.ServerNotify(context.Background(), &grpcClient, test.servers)
err := controller.ServerNotify(context.Background(), &grpcClient, test.servers, false)
g.Expect(err).To(BeNil())

if len(test.servers) != 0 {
Expand Down
2 changes: 1 addition & 1 deletion operator/scheduler/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func handleRegisteredServers(
return
}

if err := s.ServerNotify(ctx, grpcClient, serverList.Items); err != nil {
if err := s.ServerNotify(ctx, grpcClient, serverList.Items, true); err != nil {
s.logger.Error(err, "Failed to notify servers", "servers", serverList.Items)
}
}
Expand Down
2 changes: 2 additions & 0 deletions scheduler/all-host-network.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ services:
- --disable-autoscaling
- "--kafka-config-path"
- "/mnt/config/kafka-host.json"
- "--scheduler-ready-timeout-seconds"
- ${SCHEDULER_READY_TIMEOUT_SECONDS}
volumes:
- type: bind
source: ./config
Expand Down
2 changes: 2 additions & 0 deletions scheduler/all-internal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ services:
- --disable-autoscaling
- "--kafka-config-path"
- "/mnt/config/kafka-internal.json"
- "--scheduler-ready-timeout-seconds"
- ${SCHEDULER_READY_TIMEOUT_SECONDS}
volumes:
- type: bind
source: ./config
Expand Down
108 changes: 74 additions & 34 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,34 @@ import (
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/synchroniser"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/tracing"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)

var (
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool //scheduler server
autoscalingDisabled bool
kafkaConfigPath string
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool //scheduler server
autoscalingDisabled bool
kafkaConfigPath string
schedulerReadyTimeoutSeconds uint
)

const (
xDSWaitTimeout = time.Duration(10 * time.Second)
)

func init() {
Expand Down Expand Up @@ -98,12 +104,17 @@ func init() {

// Whether to enable autoscaling, default is true
flag.BoolVar(&autoscalingDisabled, "disable-autoscaling", false, "Disable autoscaling feature")

// Kafka config path
flag.StringVar(
&kafkaConfigPath,
"kafka-config-path",
"/mnt/config/kafka.json",
"Path to kafka configuration file",
)

// Timeout for scheduler to be ready
flag.UintVar(&schedulerReadyTimeoutSeconds, "scheduler-ready-timeout-seconds", 300, "Timeout for scheduler to be ready")
}

func getNamespace() string {
Expand Down Expand Up @@ -137,6 +148,8 @@ func main() {
logger.Infof("Setting log level to %s", logLevel)
logger.SetLevel(logIntLevel)

logger.Debugf("Scheduler ready timeout is set to %d seconds", schedulerReadyTimeoutSeconds)

done := make(chan bool, 1)

namespace = getNamespace()
Expand All @@ -149,16 +162,8 @@ func main() {
defer eventHub.Close()
go makeSignalHandler(logger, done)

// Start xDS server
// Create a cache
xdsCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
ctx := context.Background()
srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil)
xdsServer := envoyServer.NewXDSServer(srv, logger)
err = xdsServer.StartXDSServer(envoyPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start envoy xDS server")
}

tracer, err := tracing.NewTraceProvider("seldon-scheduler", &tracingConfigPath, logger)
if err != nil {
Expand All @@ -167,6 +172,7 @@ func main() {
defer tracer.Stop()
}

// Create stores
ss := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
ps := pipeline.NewPipelineStore(logger, eventHub, ss)
es := experiment.NewExperimentServer(logger, eventHub, ss, ps)
Expand All @@ -178,19 +184,13 @@ func main() {
GrpcPort: pipelineGatewayGrpcPort,
}

// Create envoy incremental processor
_, err = processor.NewIncrementalProcessor(xdsCache, nodeID, logger, ss, es, ps, eventHub, &pipelineGatewayDetails, cleaner)
if err != nil {
log.WithError(err).Fatalf("Failed to create incremental processor")
}

sched := scheduler.NewSimpleScheduler(
logger,
ss,
scheduler.DefaultSchedulerConfig(ss),
)
logger.Infof("Autoscaling service is set to %t", !autoscalingDisabled)
as := agent.NewAgentServer(logger, ss, sched, eventHub, !autoscalingDisabled)

// scheduler <-> dataflow grpc
dataFlowLoadBalancer := util.NewRingLoadBalancer(1)
kafkaConfigMap, err := config.NewKafkaConfig(kafkaConfigPath)
if err != nil {
Expand Down Expand Up @@ -223,17 +223,57 @@ func main() {
log.Warn("Not running with scheduler local DB")
}

s := schedulerServer.NewSchedulerServer(logger, ss, es, ps, sched, eventHub)
// Setup synchroniser
var sync synchroniser.Synchroniser

if namespace == "" {
// running outside k8s
sync = synchroniser.NewSimpleSynchroniser(time.Duration(schedulerReadyTimeoutSeconds) * time.Second)
} else {
sync = synchroniser.NewServerBasedSynchroniser(eventHub, logger, time.Duration(schedulerReadyTimeoutSeconds)*time.Second)
}

// scheduler scheduling models service
sched := scheduler.NewSimpleScheduler(
logger,
ss,
scheduler.DefaultSchedulerConfig(ss),
sync,
)

// scheduler <-> controller grpc
s := schedulerServer.NewSchedulerServer(logger, ss, es, ps, sched, eventHub, sync)
err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start server gRPC servers")
}

// scheduler <-> agent grpc
logger.Infof("Autoscaling service is set to %t", !autoscalingDisabled)
as := agent.NewAgentServer(logger, ss, sched, eventHub, !autoscalingDisabled)
err = as.StartGrpcServer(allowPlaintxt, agentPort, agentMtlsPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start agent gRPC server")
}

// wait for model servers to be ready
sync.WaitReady()

// extra wait to allow routes state to get created
time.Sleep(xDSWaitTimeout)

// Start envoy xDS server, this is done after the scheduler is ready
// so that the xDS server can start sending valid updates to envoy.
ctx := context.Background()
srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil)
xdsServer := envoyServer.NewXDSServer(srv, logger)
err = xdsServer.StartXDSServer(envoyPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start envoy xDS server")
}

log.Info("Scheduler is ready")

// Wait for completion
<-done

Expand Down
2 changes: 2 additions & 0 deletions scheduler/env.all
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ HODOMETER_RECEIVER_LOG_LEVEL=info
KAFKA_MESSAGE_MAX_BYTES=1000000000

OTEL_EXPORTER_OTLP_PROTOCOL=grpc

SCHEDULER_READY_TIMEOUT_SECONDS=30
17 changes: 8 additions & 9 deletions scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentSer
}
s.mutex.Unlock()

err := s.syncMessage(request)
s.logger.Debugf("Add Server Replica %+v with config %+v", request, request.ReplicaConfig)
err := s.store.AddServerReplica(request)
if err != nil {
return err
}
err = s.scheduleModelsFromRequest(request)
if err != nil {
return err
}
Expand Down Expand Up @@ -421,13 +426,7 @@ func (s *Server) StopAgentStreams() {
}
}

func (s *Server) syncMessage(request *pb.AgentSubscribeRequest) error {
s.logger.Debugf("Add Server Replica %+v with config %+v", request, request.ReplicaConfig)
err := s.store.AddServerReplica(request)
if err != nil {
return err
}

func (s *Server) scheduleModelsFromRequest(request *pb.AgentSubscribeRequest) error {
// we have to reschedule models that are loaded on the incoming agent
// this is because we can have a network glitch that causes the communication between the agent and the scheduler
// to drop and the scheduler loading the models on other servers.
Expand All @@ -439,7 +438,7 @@ func (s *Server) syncMessage(request *pb.AgentSubscribeRequest) error {
}
}

_, err = s.scheduler.ScheduleFailedModels()
_, err := s.scheduler.ScheduleFailedModels()
if err != nil {
return err
}
Expand Down
Loading
Loading