Skip to content

fix: Deal with deleted experiments when restoring from cache #5726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
5192a92
remove dead code path
Jun 27, 2024
0edf6d6
skip restoring an experiment if there is an error.
Jun 27, 2024
86c6f69
add a note that we do not validate pipelines when we restore them
Jun 27, 2024
e53f83a
Add test
Jun 27, 2024
7a4aa22
fix fmt
Jun 27, 2024
7a1762e
update note in code
Jun 27, 2024
bc16c71
fix bug in tag
Jun 27, 2024
df7081a
deal with deleted experiments on restore
Jun 27, 2024
8aa59d5
use a call back for deleted experiments
Jun 27, 2024
f5d2809
add test for multiple experiments in db
Jun 27, 2024
999080a
update store to mark deleted experiments
Jun 27, 2024
7036911
add experiment get (for testing)
Jun 28, 2024
15f8b0e
Add active field in experiment protos
Jun 28, 2024
c077884
add deleted instead of active
Jul 1, 2024
a5cc725
make deleted field not optional
Jul 1, 2024
128a936
handle deleted in controller for experiments
Jul 1, 2024
08faff9
fix restoring of experiments
Jul 1, 2024
9770e85
add compare for the entire proto
Jul 1, 2024
2bc7b18
add pipeline get from db helper (for testing)
Jul 1, 2024
5f4a4d0
fix lint
Jul 1, 2024
5a5fcca
add test for db check after adding pipeline
Jul 1, 2024
63723a4
add nil check from pipeline add
Jul 1, 2024
2a21835
introduce ExperimentSnapshot proto
Jul 2, 2024
71a8184
add testing coverage
Jul 2, 2024
049c8df
revert changes to operator as they are not required anymore
Jul 2, 2024
caa8fb3
add experiment db migration helper
Jul 2, 2024
365f8b3
reinstate delete helper for dbs
Jul 3, 2024
c4601ea
simplify get from DB
Jul 3, 2024
17d0b10
add testing for delete from db
Jul 3, 2024
7fea3cd
add scafolding to get the version from the (experiment) db
Jul 3, 2024
3bbf6f5
use dropall helper to clear db
Jul 3, 2024
9d7e4fa
optimize how to migrate to the new version
Jul 4, 2024
fdd4a5a
refactor common code to utils
Jul 4, 2024
1469702
add version to pipelinedb
Jul 4, 2024
942d56f
add helper to get the number of experiments
Jul 4, 2024
b0191af
add helper to count the number of expriments from the scheduler
Jul 5, 2024
dce7eee
handle load experiments on startup of controller.
Jul 5, 2024
eaf7780
remove finalizers for experiments if there are no experiments from sc…
Jul 5, 2024
351a0f8
simplify removing finalizers for experiments
Jul 5, 2024
b392dbb
add tests for experiments utils
Jul 10, 2024
150df7c
refactor model handlers and add tests
Jul 10, 2024
373c9d6
add pipeline handlers and tests
Jul 10, 2024
0c307dd
add helper to get pipeline status from scheduler
Jul 10, 2024
96f0162
Add status subresoruce to fake client
Jul 11, 2024
636bf0d
pass grpc client instead of conn to subscriptions
Jul 11, 2024
083d67e
tidy up code
Jul 11, 2024
1f0c8a6
add test for pipeline subscription
Jul 11, 2024
c49adc1
add a test for pipeline termination
Jul 11, 2024
d6cddbc
add experiment tests
Jul 11, 2024
006d270
add test case for pipelines
Jul 11, 2024
e00a6bc
check pipelineready status
Jul 11, 2024
afc43ce
add a test case when pipeline is removed
Jul 11, 2024
e57a4aa
add note about expected state
Jul 11, 2024
81f4276
add 2 extra test cases to cover when the resource doesn exist in k8s
Jul 11, 2024
667944c
fix lint
Jul 11, 2024
b1415e7
deal with errors
Jul 11, 2024
8b54833
add new line
Jul 11, 2024
ae0f70d
update copyright
Jul 11, 2024
a544528
revert back upgrade to protoc
Jul 12, 2024
76b0f1f
use grpc client in StopExperiment instead of the underlying connection
Jul 15, 2024
adb2ee8
fix mis-spelled grpc vars in controller
Jul 15, 2024
5946b88
use Be[True/False]Because pattern in experiments and pipelines tests
Jul 15, 2024
ba6223f
rename function for current db migration
Jul 15, 2024
d6ceeb2
remove empty line
Jul 15, 2024
a634b49
fix mispelling pipeline->experiment
Jul 15, 2024
e7ef7eb
fix spelling
Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 95 additions & 15 deletions apis/go/mlops/scheduler/storage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion apis/mlops/scheduler/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ message PipelineSnapshot {
uint32 lastVersion = 2;
repeated PipelineWithState versions = 3;
bool deleted = 4;
}
}

message ExperimentSnapshot {
Experiment experiment = 1;
// to mark the experiment as deleted, this is currently required as we persist all
// experiments in the local scheduler state (badgerdb) so that events can be replayed
// on restart, which would guard against lost events in communication.
bool deleted = 2;
}
4 changes: 2 additions & 2 deletions operator/controllers/mlops/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
} else { // experiment is being deleted
if utils.ContainsStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) {
// Handle unload in scheduler
if err, retry := r.Scheduler.StopExperiment(ctx, experiment); err != nil {
if retry, err := r.Scheduler.StopExperiment(ctx, experiment, nil); err != nil {
if retry {
return true, err
} else {
Expand Down Expand Up @@ -102,7 +102,7 @@ func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, err
}

err, retry := r.Scheduler.StartExperiment(ctx, experiment)
retry, err := r.Scheduler.StartExperiment(ctx, experiment, nil)
if err != nil {
r.updateStatusFromError(ctx, logger, experiment, err)
if retry {
Expand Down
2 changes: 1 addition & 1 deletion operator/controllers/mlops/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return reconcile.Result{}, err
}

err, retry := r.Scheduler.LoadPipeline(ctx, pipeline)
retry, err := r.Scheduler.LoadPipeline(ctx, pipeline, nil)
if err != nil {
r.updateStatusFromError(ctx, logger, pipeline, retry, err)
if retry {
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/utils/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ import (
)

func NewFakeClient(scheme *runtime.Scheme, objs ...client.Object) client.Client {
return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).WithStatusSubresource(objs...).Build()
}
5 changes: 3 additions & 2 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *SchedulerClient) checkErrorRetryable(resource string, resourceName stri
}

func retryFn(
fn func(context context.Context, conn *grpc.ClientConn, namespace string) error,
fn func(context context.Context, grcpClient scheduler.SchedulerClient, namespace string) error,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change of interface to facilitate testing

conn *grpc.ClientConn, namespace string, logger logr.Logger,
) error {
logger.Info("RetryFn", "namespace", namespace)
Expand All @@ -262,7 +262,8 @@ func retryFn(
}
backOffExp := backoff.NewExponentialBackOff()
fnWithArgs := func() error {
return fn(context.Background(), conn, namespace)
grcpClient := scheduler.NewSchedulerClient(conn)
return fn(context.Background(), grcpClient, namespace)
}
err := backoff.RetryNotify(fnWithArgs, backOffExp, logFailure)
if err != nil {
Expand Down
50 changes: 36 additions & 14 deletions operator/scheduler/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,38 @@ import (
"github.com/seldonio/seldon-core/operator/v2/pkg/utils"
)

func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment, grpcClient scheduler.SchedulerClient) (bool, error) {
logger := s.logger.WithName("StartExperiment")
conn, err := s.getConnection(experiment.Namespace)
if err != nil {
return err, true
var err error
if grpcClient == nil {
conn, err := s.getConnection(experiment.Namespace)
if err != nil {
return true, err
}
grpcClient = scheduler.NewSchedulerClient(conn)
}
grcpClient := scheduler.NewSchedulerClient(conn)

req := &scheduler.StartExperimentRequest{
Experiment: experiment.AsSchedulerExperimentRequest(),
}
logger.Info("Start", "experiment name", experiment.Name)
_, err = grcpClient.StartExperiment(
_, err = grpcClient.StartExperiment(
ctx,
req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
)
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}

func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment, conn *grpc.ClientConn) (bool, error) {
logger := s.logger.WithName("StopExperiment")
conn, err := s.getConnection(experiment.Namespace)
if err != nil {
return err, true
var err error
if conn == nil {
conn, err = s.getConnection(experiment.Namespace)
if err != nil {
return true, err
}
}
grcpClient := scheduler.NewSchedulerClient(conn)
req := &scheduler.StopExperimentRequest{
Expand All @@ -63,18 +70,33 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
)
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}

// namespace is not used in this function
func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, conn *grpc.ClientConn, namespace string) error {
func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grcpClient scheduler.SchedulerClient, namespace string) error {
logger := s.logger.WithName("SubscribeExperimentEvents")
grcpClient := scheduler.NewSchedulerClient(conn)

stream, err := grcpClient.SubscribeExperimentStatus(ctx, &scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"}, grpc_retry.WithMax(1))
if err != nil {
return err
}

// get experiments from the scheduler
// if there are no experiments in the scheduler state then we need to create them
// this is likely because of a restart of the scheduler that migrated the state
// to v2 (where we delete the experiments from the scheduler state)
numExperimentsFromScheduler, err := getNumExperimentsFromScheduler(ctx, grcpClient)
if err != nil {
return err
}
// if there are no experiments in the scheduler state then we need to create them if they exist in k8s
// also remove finalizers from experiments that are being deleted
if numExperimentsFromScheduler == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment: I can see how state inconsistencies may also be introduced by someone deleting an Experiment from k8s (with manual removal of finalizer) while the scheduler is down. When the scheduler comes back up, it will have that experiment in its local db (and will start it), but it's no longer in k8s. Now, there is an argument that this is what you get if you delete finalizers manually, and that should be avoided at all costs (however, one may know people that do things like that...).

handleLoadedExperiments(ctx, namespace, s, grcpClient)
handlePendingDeleteExperiments(ctx, namespace, s)
}

for {
event, err := stream.Recv()
if err != nil {
Expand Down
Loading
Loading