Skip to content

Commit dd4ce52

Browse files
author
Sherif Akoush
authored
fix: Deal with deleted experiments when restoring from cache (SeldonIO#5726)
* remove dead code path * skip restoring an experiment if there is an error. * add a note that we do not validate pipelines when we restore them * deal with deleted experiments on restore * use a call back for deleted experiments * add test for multiple experiments in db * update store to mark deleted experiments * add experiment get (for testing) * Add active field in experiment protos * add deleted instead of active * make deleted field not optional * handle deleted in controller for experiments * fix restoring of experiments * add compare for the entire proto * add pipeline get from db helper (for testing) * add test for db check after adding pipeline * add testing coverage * revert changes to operator as they are not required anymore * add experiment db migration helper * reinstate delete helper for DBss * simplify get from DB * add testing for delete from db * add scafolding to get the version from the (experiment) db * use `dropall` helper to clear db * optimize how to migrate to the new version * refactor common code to utils * add version to pipelinedb * add helper to get the number of experiments * add helper to count the number of expriments from the scheduler * handle load experiments on startup of controller. * remove finalizers for experiments if there are no experiments from scheduler * simplify removing finalizers for experiments * add tests for experiments utils * refactor model handlers and add tests * add pipeline handlers and tests * add helper to get pipeline status from scheduler * Add status subresoruce to fake client * pass grpc client instead of conn to subscriptions * add test for pipeline subscription * add a test for pipeline termination * add experiment tests * add test case for pipelines * check pipelineready status * add a test case when pipeline is removed * add note about expected state * add 2 extra test cases to cover when the resource doesn exist in k8s * deal with errors * update copyright * revert back upgrade to protoc * use grpc client in StopExperiment instead of the underlying connection * fix mis-spelled grpc vars in controller * use Be[True/False]Because pattern in experiments and pipelines tests * rename function for current db migration * fix mispelling pipeline->experiment
1 parent 12ab1fa commit dd4ce52

File tree

26 files changed

+3497
-216
lines changed

26 files changed

+3497
-216
lines changed

apis/go/mlops/scheduler/storage.pb.go

Lines changed: 95 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/mlops/scheduler/storage.proto

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,12 @@ message PipelineSnapshot {
1111
uint32 lastVersion = 2;
1212
repeated PipelineWithState versions = 3;
1313
bool deleted = 4;
14-
}
14+
}
15+
16+
message ExperimentSnapshot {
17+
Experiment experiment = 1;
18+
// to mark the experiment as deleted, this is currently required as we persist all
19+
// experiments in the local scheduler state (badgerdb) so that events can be replayed
20+
// on restart, which would guard against lost events in communication.
21+
bool deleted = 2;
22+
}

operator/controllers/mlops/experiment_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
5151
} else { // experiment is being deleted
5252
if utils.ContainsStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) {
5353
// Handle unload in scheduler
54-
if err, retry := r.Scheduler.StopExperiment(ctx, experiment); err != nil {
54+
if retry, err := r.Scheduler.StopExperiment(ctx, experiment, nil); err != nil {
5555
if retry {
5656
return true, err
5757
} else {
@@ -102,7 +102,7 @@ func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
102102
return reconcile.Result{}, err
103103
}
104104

105-
err, retry := r.Scheduler.StartExperiment(ctx, experiment)
105+
retry, err := r.Scheduler.StartExperiment(ctx, experiment, nil)
106106
if err != nil {
107107
r.updateStatusFromError(ctx, logger, experiment, err)
108108
if retry {

operator/controllers/mlops/pipeline_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
112112
return reconcile.Result{}, err
113113
}
114114

115-
err, retry := r.Scheduler.LoadPipeline(ctx, pipeline)
115+
retry, err := r.Scheduler.LoadPipeline(ctx, pipeline, nil)
116116
if err != nil {
117117
r.updateStatusFromError(ctx, logger, pipeline, retry, err)
118118
if retry {

operator/pkg/utils/testing/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ import (
1616
)
1717

1818
func NewFakeClient(scheme *runtime.Scheme, objs ...client.Object) client.Client {
19-
return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
19+
return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).WithStatusSubresource(objs...).Build()
2020
}

operator/scheduler/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ func (s *SchedulerClient) RemoveConnection(namespace string) {
132132

133133
// A smoke test allows us to quickly check if we actually have a functional grpc connection to the scheduler
134134
func (s *SchedulerClient) smokeTestConnection(conn *grpc.ClientConn) error {
135-
grcpClient := scheduler.NewSchedulerClient(conn)
135+
grpcClient := scheduler.NewSchedulerClient(conn)
136136

137-
stream, err := grcpClient.SubscribeModelStatus(context.TODO(), &scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"}, grpc_retry.WithMax(1))
137+
stream, err := grpcClient.SubscribeModelStatus(context.TODO(), &scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"}, grpc_retry.WithMax(1))
138138
if err != nil {
139139
return err
140140
}
@@ -253,7 +253,7 @@ func (s *SchedulerClient) checkErrorRetryable(resource string, resourceName stri
253253
}
254254

255255
func retryFn(
256-
fn func(context context.Context, conn *grpc.ClientConn, namespace string) error,
256+
fn func(context context.Context, grpcClient scheduler.SchedulerClient, namespace string) error,
257257
conn *grpc.ClientConn, namespace string, logger logr.Logger,
258258
) error {
259259
logger.Info("RetryFn", "namespace", namespace)
@@ -262,7 +262,8 @@ func retryFn(
262262
}
263263
backOffExp := backoff.NewExponentialBackOff()
264264
fnWithArgs := func() error {
265-
return fn(context.Background(), conn, namespace)
265+
grpcClient := scheduler.NewSchedulerClient(conn)
266+
return fn(context.Background(), grpcClient, namespace)
266267
}
267268
err := backoff.RetryNotify(fnWithArgs, backOffExp, logFailure)
268269
if err != nil {

operator/scheduler/experiment.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"io"
1515

1616
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
17-
"google.golang.org/grpc"
1817
v1 "k8s.io/api/core/v1"
1918
"k8s.io/client-go/util/retry"
2019
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -26,55 +25,77 @@ import (
2625
"github.com/seldonio/seldon-core/operator/v2/pkg/utils"
2726
)
2827

29-
func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
28+
func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment, grpcClient scheduler.SchedulerClient) (bool, error) {
3029
logger := s.logger.WithName("StartExperiment")
31-
conn, err := s.getConnection(experiment.Namespace)
32-
if err != nil {
33-
return err, true
30+
var err error
31+
if grpcClient == nil {
32+
conn, err := s.getConnection(experiment.Namespace)
33+
if err != nil {
34+
return true, err
35+
}
36+
grpcClient = scheduler.NewSchedulerClient(conn)
3437
}
35-
grcpClient := scheduler.NewSchedulerClient(conn)
38+
3639
req := &scheduler.StartExperimentRequest{
3740
Experiment: experiment.AsSchedulerExperimentRequest(),
3841
}
3942
logger.Info("Start", "experiment name", experiment.Name)
40-
_, err = grcpClient.StartExperiment(
43+
_, err = grpcClient.StartExperiment(
4144
ctx,
4245
req,
4346
grpc_retry.WithMax(SchedulerConnectMaxRetries),
4447
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
4548
)
46-
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
49+
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
4750
}
4851

49-
func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
52+
func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment, grpcClient scheduler.SchedulerClient) (bool, error) {
5053
logger := s.logger.WithName("StopExperiment")
51-
conn, err := s.getConnection(experiment.Namespace)
52-
if err != nil {
53-
return err, true
54+
var err error
55+
if grpcClient == nil {
56+
conn, err := s.getConnection(experiment.Namespace)
57+
if err != nil {
58+
return true, err
59+
}
60+
grpcClient = scheduler.NewSchedulerClient(conn)
5461
}
55-
grcpClient := scheduler.NewSchedulerClient(conn)
5662
req := &scheduler.StopExperimentRequest{
5763
Name: experiment.Name,
5864
}
5965
logger.Info("Stop", "experiment name", experiment.Name)
60-
_, err = grcpClient.StopExperiment(
66+
_, err = grpcClient.StopExperiment(
6167
ctx,
6268
req,
6369
grpc_retry.WithMax(SchedulerConnectMaxRetries),
6470
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
6571
)
66-
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
72+
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
6773
}
6874

6975
// namespace is not used in this function
70-
func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, conn *grpc.ClientConn, namespace string) error {
76+
func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error {
7177
logger := s.logger.WithName("SubscribeExperimentEvents")
72-
grcpClient := scheduler.NewSchedulerClient(conn)
7378

74-
stream, err := grcpClient.SubscribeExperimentStatus(ctx, &scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"}, grpc_retry.WithMax(1))
79+
stream, err := grpcClient.SubscribeExperimentStatus(ctx, &scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"}, grpc_retry.WithMax(1))
7580
if err != nil {
7681
return err
7782
}
83+
84+
// get experiments from the scheduler
85+
// if there are no experiments in the scheduler state then we need to create them
86+
// this is likely because of a restart of the scheduler that migrated the state
87+
// to v2 (where we delete the experiments from the scheduler state)
88+
numExperimentsFromScheduler, err := getNumExperimentsFromScheduler(ctx, grpcClient)
89+
if err != nil {
90+
return err
91+
}
92+
// if there are no experiments in the scheduler state then we need to create them if they exist in k8s
93+
// also remove finalizers from experiments that are being deleted
94+
if numExperimentsFromScheduler == 0 {
95+
handleLoadedExperiments(ctx, namespace, s, grpcClient)
96+
handlePendingDeleteExperiments(ctx, namespace, s)
97+
}
98+
7899
for {
79100
event, err := stream.Recv()
80101
if err != nil {

0 commit comments

Comments
 (0)